## Аггрегация разметки датасета RCB

Аггрегация строится по следующей системе:

1. Сбор размеченных пулов с Толоки. Возможны варианты:
    - только общий пул нужно аггрегировать, тогда забирается только он
    - часть данных находится в контрольных заданиях и экзамене, тогда к основному пулу добавляются данные задания
2. Фильтрация разметчиков:
    - в общем пуле есть некоторое количество заранее размеченных заданий - контрольных
    - хорошим считается разметчик, который показывает `accuracy >= 0.5` на данных заданиях
    - формируется список "плохих" разметчиков
3. Аггрегация ответов разметчиков по заданиям:
    - форматирование в заданиях может отличаться от изначального из-за выгрузки с Толоки
    - учитываются только ответы "хороших" разметчиков
    - аггрегация по подготовленным пулам - создается массив карточек вида {key: value}, где key - кортеж из всех значимых элементов задания, value - список из кортежей вида (user_id, answer)
4. Голосование большинством по каждому заданию:
    - минимально необходимое большинство составляет 3 голоса, так как такое большинство валидно для перекрытия 5
    - по результату формируется датафрейм с заданиями и ответами
5. Подгрузка оригинальных данных с разметкой в виде таблицы с заданиями и ответами
6. Соединение таблиц:
    - очистка форматирования в таблице с ответами разметчиков и в таблице с правильными ответами
    - создание единых столбцов с полным заданием
    - соединение таблиц по данному столбцу
    - валидация размеров
7. Подсчет метрик

In [1]:
import pandas as pd
import numpy as np
from collections import Counter
from sklearn.metrics import f1_score

### Сбор данных разметки и фильтрация разметчиков

Датасет для разметки состоит из 525 объектов.

In [2]:
assignments = pd.read_csv('assignments_from_pool_42365874__11-12-2023.tsv', sep='\t')
skills = pd.read_csv('workerSkills.csv', sep='|')

Разметчикам предлагалось на гипотезы и ситуации определить их взаимосвязь: гипотеза может либо следовать из ситуации, либо противоречить ей, либо не быть с ней связанной.
Вход: 
- INPUT:premise (пример: `Все утро шел сильный дождь.`).
- INPUT:hypothesis (пример: `Я весь вымок.`).

Выход:
- OUTPUT:answer (одна из трех цифр: `1`, `2`, `3`).

In [4]:
assignments.head(1)

Unnamed: 0,INPUT:premise,INPUT:hypothesis,OUTPUT:answer,GOLDEN:answer,HINT:text,HINT:default_language,ASSIGNMENT:link,ASSIGNMENT:task_id,ASSIGNMENT:assignment_id,ASSIGNMENT:task_suite_id,ASSIGNMENT:worker_id,ASSIGNMENT:status,ASSIGNMENT:started,ASSIGNMENT:submitted,ASSIGNMENT:accepted,ASSIGNMENT:reward
0,В ближайшие несколько месяцев будут представле...,Сразу несколько аппаратов станут новым витком ...,1,,,,https://platform.toloka.ai/task/42365874/00028...,00028673b2--65661c962eae777d413ceec5,00028673b2--656626d8cb94b61ac9a31ba0,00028673b2--656626d8cb94b61ac9a31b9b,613cd9c009dce0c9dbdfed7a4fc2aa64,APPROVED,2023-11-28T17:43:52.392,2023-11-28T17:45:03.448,2023-11-28T17:45:03.448,0.03


Фильтруем толокеров с `accuracy < 0.5` на контрольных заданиях, чтобы не учитывать их ответы при подсчете метрик.

In [5]:
from collections import defaultdict

users_dict = defaultdict(lambda: defaultdict(int))

for idx, row in assignments.iterrows():
    text = row[0]

    out = row[2]
    
    gold = row[3]

    user = row[10]

    if str(user) != "nan" and str(gold) != "nan":
        if int(out) == int(gold):
            users_dict[user]["good"] += 1
        else:
            users_dict[user]["bad"] += 1

print("Users total: ", len(users_dict))
bad_users = []
for key, value in users_dict.items():
    percentage_good = value["good"]/(value["good"] + value["bad"])
    if percentage_good < 0.5:
        bad_users.append(key)

print("Bad users:", len(bad_users))

Users total:  229
Bad users: 112


112 из 229 разметчиков на контрольных заданиях показали слишком плохое качество, чтобы учитывать их ответы для расчета метрики.

Отделяем контроль от основы, так как контрольные задания создавались отдельно и не должны учитываться при подсчете метрик. На контрольных заданиях есть `GOLDEN:answer`. Также отсеиваем возможные баги Толоки, когда в строке может не быть задания - `INPUT:hypothesis` содержит NaN.

In [6]:
assignments_no_control = assignments[assignments['GOLDEN:answer'].isnull()]
assignments_no_control_no_null = assignments_no_control[assignments_no_control['INPUT:hypothesis'].notnull()]

Посчитаем, сколько было затрачено на получение разметки тестовых данных без учета контрольных заданий, так как они могли проходиться неограниченное количество раз одним и тем же разметчиком.

In [7]:
def w_sum(df):
    idx = df.index.values
    vals = df.values
    summ = idx * vals
    return summ.sum()

d1 = assignments_no_control_no_null['ASSIGNMENT:reward'].value_counts(normalize=True)
d2 = assignments_no_control_no_null['ASSIGNMENT:reward'].value_counts()
print(f'взвешенная цена айтема в тесте: {round(w_sum(d1), 3)}')
print(f'потрачено на разметку теста: {round(w_sum(d2), 3)}')
print(f'{round(w_sum(d2), 3)} / {round(w_sum(d1), 3)}')

взвешенная цена айтема в тесте: 0.034
потрачено на разметку теста: 73.464
73.464 / 0.034


Выделим, сколько составила средняя часовая ставка для разметки тестовой части датасета. Это будет простое среднее из следующих величин: количество заданий, которое разметчк может сделать за час на основе данного задания, помноженное на цену задания.

In [8]:
def get_hour_pay(df):
    try:
        times = pd.to_datetime(df['ASSIGNMENT:submitted']) - pd.to_datetime(df['ASSIGNMENT:started'])
    except Exception as e:
        times = []
        for i in range(len(assignments_no_control_no_null)):
            try:
                start = pd.to_datetime(assignments_no_control_no_null['ASSIGNMENT:started'].iloc[i])
            except Exception as e:
                start = pd.to_datetime(assignments_no_control_no_null['ASSIGNMENT:started'].apply(lambda x: x.split('T')[1]).iloc[i])
            try:
                end = pd.to_datetime(assignments_no_control_no_null['ASSIGNMENT:submitted'].iloc[i])
            except Exception as e:
                start = pd.to_datetime(assignments_no_control_no_null['ASSIGNMENT:submitted'].apply(lambda x: x.split('T')[1]).iloc[i])
            delta = end - start
            times.extend([delta])
        times = pd.Series(times)
        # times = pd.to_datetime(df['ASSIGNMENT:submitted'].apply(lambda x: x.split('T')[1])) - pd.to_datetime(df['ASSIGNMENT:started'].apply(lambda x: x.split('T')[1]))
    sums = 3600 / times.apply(lambda x: x.seconds) * df['ASSIGNMENT:reward']
    return sums.mean()

get_hour_pay(assignments_no_control_no_null)

2.6095295111869774

### Сбор ответов разметчиков и голосование

Собираем ответы голосования большинством для каждого задания.

In [9]:
from collections import defaultdict

text_dict = defaultdict(list)

for premise, hypothesis, user, out in zip(
    assignments_no_control_no_null["INPUT:premise"], assignments_no_control_no_null["INPUT:hypothesis"],
    assignments_no_control_no_null["ASSIGNMENT:worker_id"], assignments_no_control_no_null["OUTPUT:answer"]
    ):
    if user not in bad_users:
        text_dict[(premise, hypothesis)].append([
                user,
                {"out": out}
        ])

print(len(text_dict))

438


In [10]:
keys = list(text_dict.keys())
Counter([len(text_dict[keys[i]]) for i in range(len(keys))])

Counter({5: 172, 4: 168, 3: 76, 2: 18, 1: 4})

Есть 110 заданий, где перекрытие меньше 5. Для формирования итоговых лейблов нужно, чтобы было простое большинство разметчиков, проголосовавших за данную опцию. Если большинства нет, то оценка строится, исходя из оценки навыков разметчиков. В таком случае, финальный лейбл будет присвоен по голосу группы с наилучшими навыками.

In [11]:
preds_full = {}
user2skill = {k:v for k, v in zip(skills['worker_id'], skills['skill_value'])}
control_acc = assignments[assignments['GOLDEN:answer'].notna()]\
    .groupby('ASSIGNMENT:worker_id')\
        .apply(lambda x: (np.array(x['OUTPUT:answer']) == np.array(x['GOLDEN:answer'])).mean())
user2control = {k:v for k, v in zip(control_acc.index, control_acc.values)}

from crowdkit.aggregation.classification.glad import GLAD

full = assignments['INPUT:premise'] + ' ' + assignments['INPUT:hypothesis']
id2task = dict(enumerate(full))
task2id = {k:v for v, k in id2task.items()}
id2user = dict(enumerate(assignments['ASSIGNMENT:worker_id']))
user2id = {k:v for v, k in id2user.items()}

codes = full.map(task2id)
res = pd.DataFrame({'task': codes, 'worker': assignments['ASSIGNMENT:worker_id'].map(user2id), 'label': assignments['OUTPUT:answer']})
model = GLAD(n_iter=10000, tol=1e-06, m_step_max_iter=1000, m_step_tol=1e-03)
model.fit(res)
user2alpha = dict(enumerate(model.alphas_))
tb = model.alphas_.copy()
tb.index = tb.index.map(id2user)
user2alpha = {k:v for k, v in zip(tb.index, tb.values)}

stats = {
    'total_agreement': 0,
    'majority': 0,
    'skill_based': 0,
    'major_based': 0,
    'em_based': 0,
    'rest': 0,
}

for i in range(len(keys)):
    ans = text_dict[keys[i]]
    lst = [[ans[j][0], ans[j][1]['out']] for j in range(len(ans))]
    users, votes = list(zip(*lst))
    cnt = pd.Series(Counter(votes)).sort_values(ascending=False)

    # # total agreement
    if len(cnt) == 1:
        res = cnt.index[0]
        stats['total_agreement'] += 1
    # simple majority
    elif cnt.iloc[0] > cnt.iloc[1]:
        res = cnt.index[0]
        stats['majority'] += 1
    # (> 1 options) & (1 option == 2 option)
    else:
        # try overall skill based comparison
        vals = list(map(lambda x: user2skill[x], users))
        table = pd.DataFrame({'user': users, 'votes': votes, 'skill': vals})
        agg = table.groupby('votes').agg(
            sum_skill=pd.NamedAgg(column='skill', aggfunc='sum'),
            sum_votes=pd.NamedAgg(column='user', aggfunc='count')
        ).sort_values(by=['sum_votes', 'sum_skill'], ascending=False)
        # check there is a leader by skills
        if agg['sum_skill'].iloc[0] > agg['sum_skill'].iloc[1]:
            res = agg.index[0]
            stats['skill_based'] += 1
        else:
            # top-3 answers by overall skills
            vals = list(map(lambda x: user2skill[x], users))
            table = pd.DataFrame({'user': users, 'votes': votes, 'skill': vals})
            table = table.sort_values(by='skill', ascending=False)
            if len(table) >= 3:
                sub = table.iloc[:3]
            else:
                sub = table
            agg = sub.groupby('votes').agg(
                sum_skill=pd.NamedAgg(column='skill', aggfunc='sum'),
                sum_votes=pd.NamedAgg(column='user', aggfunc='count')
            ).sort_values(by=['sum_votes', 'sum_skill'], ascending=False)
            if agg['sum_skill'].iloc[0] != agg['sum_skill'].iloc[1]:
                res = agg.index[0]
                stats['major_based'] += 1
            
            else:
                vals = list(map(lambda x: user2alpha[x], users))
                table = pd.DataFrame({'user': users, 'votes': votes, 'skill': vals})
                agg = table.groupby('votes').agg(
                    sum_skill=pd.NamedAgg(column='skill', aggfunc='sum'),
                    sum_votes=pd.NamedAgg(column='user', aggfunc='count')
                ).sort_values(by=['sum_votes', 'sum_skill'], ascending=False)
                # check there is a leader by skills
                if agg['sum_skill'].iloc[0] != agg['sum_skill'].iloc[1]:
                    res = agg.index[0]
                    stats['em_based'] += 1
                else:
                    res = agg.index[0]
                    stats['rest'] += 1

    preds_full[keys[i]] = res

In [12]:
stats

{'total_agreement': 218,
 'majority': 190,
 'skill_based': 3,
 'major_based': 16,
 'em_based': 11,
 'rest': 0}

In [13]:
preds_full_df = pd.concat([pd.DataFrame(preds_full.keys(), columns=['premise', 'hypothesis']), pd.DataFrame(preds_full.values(), columns=['lb'])], axis=1).astype(str)

### Сопоставление разметки и ground truth

Забираем задания из датасета с правильными ответами.

In [14]:
res_df = pd.read_csv('general_wa.tsv', sep='\t')

In [15]:
res_df = res_df.rename({
    'INPUT:premise': 'premise',
    'INPUT:hypothesis': 'hypothesis',
    'GOLDEN:answer': 'lb',
}, axis=1)

После скачивания с Толоки в текстах рушится форматирование, потому нельзя просто сделать join двух табличек. Нужно убрать все "лишнее" форматирование сразу из двух табличек, чтобы остались только тексты, пунктуация и пробелы.

In [16]:
res_df.head(1)

Unnamed: 0,lb,premise,hypothesis
0,3,"Мужчина уже был ранее судим, рассказали «Комсо...",Мужчину раньше уже судили.


In [17]:
def format_text(text):
    text = (text.strip().replace('\n', ' ').replace('\t', ' ')
            .replace('\r', ' ').replace('  ', ' ').replace('  ', ' ')
            .replace('  ', ' '))
    return text

res_df['premise'] = res_df['premise'].apply(format_text)
res_df['hypothesis'] = res_df['hypothesis'].apply(format_text)

preds_full_df['premise'] = preds_full_df['premise'].apply(format_text)
preds_full_df['hypothesis'] = preds_full_df['hypothesis'].apply(format_text)

res_df['full'] = res_df['premise'] + ' ' + res_df['hypothesis']
preds_full_df['full'] = preds_full_df['premise'] + ' ' + preds_full_df['hypothesis']

Делаем left join, чтобы соединить голосование и правильные метки для одних и тех же заданий.

In [18]:
new = res_df.merge(preds_full_df.drop(['premise', 'hypothesis'], axis=1), on='full', how='left')

In [19]:
new_valid = new[new['lb_y'].notna()].copy()
len(new_valid)

438

In [20]:
new_valid['lb_y'] = new_valid['lb_y'].astype(int)

Ни одна строка не была утеряна.

In [21]:
new_valid.head(1)

Unnamed: 0,lb_x,premise,hypothesis,full,lb_y
0,3,"Мужчина уже был ранее судим, рассказали «Комсо...",Мужчину раньше уже судили.,"Мужчина уже был ранее судим, рассказали «Комсо...",1


### Подсчет метрики

Если в правом столбце меток осталось 438 непустых строк, значит, форматирование было подчищено корректно и ничего не потерялось

Попробуем посчитать разные метрики

In [22]:
(new_valid['lb_x'] == new_valid['lb_y']).mean()

0.58675799086758

In [23]:
f1_score(new_valid['lb_x'], new_valid['lb_y'], average='macro')

0.5651979834870691

`Accuracy = 0.587`, `F1_macro = 0.565`