In [24]:
!pip install -U tiktoken
!pip install -U shortuuid
!pip install -U transformers==4.45.2
!pip install -U datasets
!pip install -U rouge-score
!pip install -U pymorphy3
!pip install -U peft
!pip install -U evalica

Collecting shortuuid
  Downloading shortuuid-1.0.13-py3-none-any.whl.metadata (5.8 kB)
Downloading shortuuid-1.0.13-py3-none-any.whl (10 kB)
Installing collected packages: shortuuid
Successfully installed shortuuid-1.0.13
Collecting transformers==4.45.2
  Downloading transformers-4.45.2-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers<0.21,>=0.20 (from transformers==4.45.2)
  Downloading tokenizers-0.20.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading transformers-4.45.2-py3-none-any.whl (9.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.9/9.9 MB[0m [31m51.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tokenizers-0.20.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m50.9 MB/s[0m eta [36m0:0

In [25]:
from typing import List, Dict, Tuple
import copy
import pandas as pd
import numpy as np
from tqdm import tqdm
from evalica import bradley_terry, Winner, pairwise_frame
from functools import partial
from scipy.special import expit
from scipy.optimize import minimize
from math import log
from random import randint
import re

tqdm.pandas()

In [90]:
STYLE_CONTROL_ELEMENTS = [
    "len_answer",
    "header_count",
    "list_count",
    "bold_count",
    "code_blocks_count"
]

DIFF_MASK = np.array([1.0, -1.0], dtype=np.float64)

def count_style_elements(markdown_text):
    def remove_pattern(answer, pattern):
        blocks = pattern.findall(answer)
        for block in blocks:
            answer = answer.replace(block, "")
        return answer

    len_answer = len(markdown_text)
    code_count = len(re.findall(r"```[^`]+```", markdown_text))
    code_pattern = re.compile("```([^`]+)```")
    markdown_text = remove_pattern(markdown_text, code_pattern)
    markdown_text = markdown_text.replace("```", "")

    mono_count = len(re.findall(r"`[^`]+`", markdown_text))
    mono_pattern = re.compile("`([^`]+)`")
    markdown_text = remove_pattern(markdown_text, mono_pattern)
    counters = {
        f"len_answer": len_answer,
        f"header_count": {
            "h1": len(re.findall(r"^#{1}\s", markdown_text, re.MULTILINE)),
            "h2": len(re.findall(r"^#{2}\s", markdown_text, re.MULTILINE)),
            "h3": len(re.findall(r"^#{3}\s", markdown_text, re.MULTILINE)),
            "h4": len(re.findall(r"^#{4}\s", markdown_text, re.MULTILINE)),
            "h5": len(re.findall(r"^#{5}\s", markdown_text, re.MULTILINE)),
            "h6": len(re.findall(r"^#{6}\s", markdown_text, re.MULTILINE)),
        },
        f"list_count": {
            "ordered": len(re.findall(r"^\s*\d+\.\s", markdown_text, re.MULTILINE)),
            "unordered": len(re.findall(r"^\s*[-*+]\s", markdown_text, re.MULTILINE)),
        },
        f"bold_count": {
            "**": len(re.findall(r"\*\*[^*\n]+\*\*", markdown_text)),
            "__": len(re.findall(r"__[^_\n]+__", markdown_text)),
        },
        f"code_blocks_count": {
            "`": mono_count,
            "```": code_count,
        },
    }
    return counters


def extract_style_feature(x, feature):
    val = x[feature]
    if isinstance(val, int):
        return val
    else:
        return sum(val.values())


def get_element_counts(text):
    style_elements = count_style_elements(text)
    el_counts = []
    for feature in style_elements:
        el_counts.append(extract_style_feature(style_elements, feature))
    return el_counts


def calculate_style(
    model_a: pd.Series,
    model_b: pd.Series,
    style_elements: list[str]=STYLE_CONTROL_ELEMENTS
):
    n_features = len(style_elements)
    n_battles = model_a.shape[0]
    style_matrix = np.zeros(shape=(2*n_features, n_battles))
    for idx, element in enumerate(style_elements):
        style_matrix[idx, :] = np.array([el[idx] for el in model_a])
    for idx, element in enumerate(style_elements):
        style_matrix[n_features + idx, :] = np.array([el[idx] for el in model_b])
    style_diff = (style_matrix[:n_features] - style_matrix[n_features]).astype(float)
    style_sum = (style_matrix[:n_features] + style_matrix[n_features]).astype(float)

    style_diff /= style_sum

    style_mean = np.mean(style_diff, axis=1)
    style_std = np.std(style_diff, axis=1)
    features = ((style_diff - style_mean[:, np.newaxis]) / style_std[:, np.newaxis]).T

    return features


def get_matchups_models(model_a: pd.Series, model_b: pd.Series):
    n_rows = len(model_a)
    assert len(model_b) == n_rows
    model_indices, models = pd.factorize(pd.concat([model_a, model_b]))
    matchups = np.column_stack([model_indices[:n_rows], model_indices[n_rows:]])
    return matchups, models.to_list()


def contextual_bt_loss_and_grad(
    params,
    n_competitors,
    matchups,
    features,
    outcomes,
    alpha=1.0,
    reg=1.0,
    half_reg=0.5,
):
    reg_loss = half_reg * np.inner(params, params)

    ratings = params[:n_competitors]
    feature_params = params[n_competitors:]

    matchup_ratings = ratings[matchups]

    bt_logits = alpha * (matchup_ratings[:, 0] - matchup_ratings[:, 1])
    context_logits = np.dot(features, feature_params)
    probs = expit(bt_logits + context_logits)
    loss = (
        -((np.log(probs) * outcomes + np.log(1.0 - probs) * (1.0 - outcomes))).sum()
        + reg_loss
    )

    error = outcomes - probs
    grad = reg * params
    matchups_grads = -alpha * error
    np.add.at(
        grad[:n_competitors], matchups[:, [0, 1]], matchups_grads[:, None] * DIFF_MASK
    )
    grad[n_competitors:] -= np.dot(features.T, error)

    return loss, grad, expit(context_logits)


def fit_contextual_bt(
    matchups,
    features,
    outcomes,
    models,
    idxs=None,
    alpha=log(10.0),
    reg=0.5,
    tol=1e-6,
):
    n_features = features.shape[1]
    n_models = len(models)
    initial_params = np.zeros(n_models + n_features, dtype=np.float64)
    half_reg = reg / 2.0

    if idxs is not None:
        matchups, features, outcomes = matchups[idxs], features[idxs], outcomes[idxs]

    result = minimize(
        fun=contextual_bt_loss_and_grad,
        x0=initial_params,
        args=(n_models, matchups, features, outcomes, alpha, reg, half_reg),
        jac=True,
        method="L-BFGS-B",
        options={"disp": False, "maxiter": 100, "gtol": tol},
    )
    loss, grad, context_logits = contextual_bt_loss_and_grad(result["x"], n_models, matchups, features, outcomes, alpha, reg, half_reg)
    return result["x"], context_logits


def compute_style_control(
    df: pd.DataFrame,
    alpha=log(10.0), reg=0.5, tol=1e-6
):
    features = calculate_style(df.model_a_style, df.model_b_style)
    matchups, models = get_matchups_models(df.model_a, df.model_b)
    outcomes = df.winner.values
    params, context_logits = fit_contextual_bt(
        matchups,
        features,
        outcomes,
        models=models,
        alpha=alpha,
        reg=reg,
        tol=tol,
    )
    ratings = params[: len(models)]
    weigths = params[len(models):]
    return ratings, models, context_logits

def scale_and_offset(
    ratings,
    models=[],
    baseline_model='',
    scale=400,
    init_rating=1000,
    baseline_rating=1114,
):
    """convert ratings from the natural scale to the Elo rating scale with an anchored baseline"""
    scaled_ratings = (ratings * scale) + init_rating
    if baseline_model and models and baseline_model in models:
        baseline_idx = models.index(baseline_model)
        scaled_ratings += baseline_rating - scaled_ratings[..., [baseline_idx]]
    return scaled_ratings

In [28]:
import json
from typing import List, Dict, Tuple, Union
import pandas as pd

In [42]:
def save_battles_to_json(filename: str, results: List[Dict]=[]):
    if not results:
        return
    if not filename:
        filename = "battles_saved_file.json"
    try:
      with open(filename, 'w', encoding='utf-8') as f:
        f.write('[\n')
        for i, item in enumerate(results):
            line = json.dumps(item, ensure_ascii=False)
            if i < len(results) - 1:
                line += ','
            f.write(f'  {line}\n')
        f.write(']')
    except Exception as e:
        print(f"Error with file {filename}: {e}.")
    else:
        print(f"Data was stored to {filename}.")

In [91]:
def _prepare_data(
    results: List[Dict],
    len_control: bool=False,
    style_control: bool=True,
    pen_only_model: bool=False
  ) -> list:

    if style_control:
        for r in results:
            if 'styles' not in r:
                raise ValueError("If the 'style_control' mode is on, the data should contain information about the style of model answers.")

    if len_control:
        for r in results:
            if 'lens' not in r:
                if 'styles' in r:
                    r['lens'] = {'model': r['styles']['model'][0], 'reference': r['styles']['reference'][0]}
                else:
                    raise ValueError("If the 'len_control' mode is on, the data should contain information about the length of model answers.")

    full_hash =  ['|'.join([str(r['id']), r['model_name'], r['reference_model_name']]) for r in results]
    model_hash = ['|'.join([str(r['id']), r['model_name']]) for r in results]
    reference_hash = ['|'.join([str(r['id']), r['reference_model_name']]) for r in results]

    data = []

    df = pd.DataFrame()
    df['p'] = [r['p'] for r in results]
    df['full_hash'] = full_hash
    df['model_hash'] = model_hash
    df['reference_hash'] = reference_hash

    df['model_name'] = [r['model_name'] for r in results]
    df['reference_model_name'] = [r['reference_model_name'] for r in results]

    if style_control:
        df['model_style'] = [np.array(r['styles']['model']) for r in results]
        df['reference_style'] = [np.array(r['styles']['reference']) for r in results]
    elif len_control:
        df['model_len'] = [r['lens']['model'] for r in results]
        df['reference_len'] = [r['lens']['reference'] for r in results]

        answer_len_deltas = {}
        for ref_model_name, group in df.groupby('reference_model_name'):
            answer_len_deltas[ref_model_name] = (group['reference_len'] - group['model_len']).std()

    for _, group in df.groupby('model_hash'):
        for _, subgroup in group.groupby('full_hash'):
            # assert subgroup.shape[0] == 2
            if (subgroup['model_name'] == subgroup['reference_model_name']).all():
                continue
            pred = int(subgroup['p'].mean() >= 0.5)
            if style_control:
                data.append([subgroup['model_name'].tolist()[0], subgroup['reference_model_name'].tolist()[0], pred, subgroup['model_style'].tolist()[0], subgroup['reference_style'].tolist()[0]])
            else:
                normalized_answer_delta_weight = 0.5
                if len_control and (pred or not pen_only_model):
                    normalized_answer_delta_weight = 0.5
                    answers_length_deltas_std = answer_len_deltas[subgroup['reference_model_name'].iloc[0]]
                    answer_length_delta = (subgroup['reference_len'] - subgroup['model_len']).iloc[0]
                    if answer_length_delta != 0: # same model as ref
                        normalized_answer_delta_weight = expit(answer_length_delta / answers_length_deltas_std)
                data.append([subgroup['model_name'].tolist()[0], subgroup['reference_model_name'].tolist()[0], pred, normalized_answer_delta_weight])
    return data

In [31]:
def right_columns(df: pd.DataFrame, columns: list[str]) -> bool:
    if df.empty:
        return False
    for c in columns:
        if c not in df.columns:
            return False
    return True

In [60]:
def get_models_from_data(df: pd.DataFrame) -> list[str]:
    if right_columns(df, ['model_a', 'model_b']):
        return pd.concat([df.model_a, df.model_b]).unique().tolist()
    return []

In [79]:
def _calculate_mean_scores(df: pd.DataFrame) -> pd.DataFrame:
    # taken from https://github.com/VikhrModels/ru_llm_arena/blob/master/show_result.py
    if not right_columns(df, ['model_a', 'model_b', 'winner', 'answer_deltas']):
        print("Error in dataframe while computing mean scores.")
        return pd.DataFrame()
    df['winner'] = df['winner'].map({
            1: Winner.X,
            0: Winner.Y
    })
    result = bradley_terry(
        df['model_a'],
        df['model_b'],
        df['winner'],
        weights=df['answer_deltas'],
        tolerance=1e-8
    )
    df = pairwise_frame(result.scores)
    np.fill_diagonal(df.values, np.nan)
    return df

def _calculate_ratings(
    results: List[Dict]=[],
    style_control: bool=True,
    len_control: bool=False,
    pen_only_model: bool=False,
    get_mean_scores: bool=True,
    get_elo_ratings: bool=True,
    model: str='',
  ) -> Dict:
    '''
    На вход подаются
    results = [
        {
            'p': <model_proba_i>,
            'id': <id_i>,
            'model_name': <model_name_i>,
            'reference_model_name': <reference_model_name_i>,
            'styles': {
                'model':      [x, x, x, x, x],
                'reference':  [x, x, x, x, x], # [ len_answer, header_count, list_count, bold_count, code_blocks_count ]
            }
        } for i in range(2*n*m) # на каждый instruction было по 2 sample
    ]
    '''
    if not results:
        raise ValueError("No data for computing!")
    data = _prepare_data(results, len_control=len_control, style_control=style_control, pen_only_model=pen_only_model)
    if not data:
        raise ValueError("No data for computing!")
    scores = {'elo': {}, 'mean_scores': {}}
    if style_control:
        df = pd.DataFrame(data, columns=['model_a', 'model_b', 'winner', 'model_a_style', 'model_b_style'])
    else:
        df = pd.DataFrame(data, columns=['model_a', 'model_b', 'winner', 'answer_deltas'])
    models = get_models_from_data(df)
    if model and model not in models:
        raise ValueError(f"Model {model} is not in dataset!")
    if style_control:
        ratings, models, context_logits = compute_style_control(df)
        df['answer_deltas'] = context_logits
        scaled_ratings = scale_and_offset(ratings)
        if get_elo_ratings:
            for i in range(len(models)):
                scores['elo'][models[i]] = scaled_ratings[i]
    if get_mean_scores:
        df = _calculate_mean_scores(df)
        if df.empty:
            print(scores)
            print('Error in calculating mean scores.')
        if model:
            scores['mean_scores'][model] = df.loc[model].mean()
        else:
            for m in models:
                scores['mean_scores'][m] = df.loc[m].mean()
    return scores

In [75]:
def get_results_from_file(filename) -> List[Dict]:
    with open(filename, 'r', encoding='utf-8') as file:
        results: List[Dict] = json.load(file)
        return results

In [94]:
def get_ratings(
    sourse_files: Union[str, list[str]],
    model: str='',
    save_all_battles: bool=True,
    newfilename: str='',
    answerfile: str='',
    load_results: bool=True,
    print_results: bool=True,
    style_control: bool=True,
    len_control: bool=False,
    no_control: bool=False,
    pen_only_model: bool=False,
    get_mean_scores: bool=True,
    get_elo_ratings: bool=True
):
    if not load_results and not print_results:
        raise ValueError("No task!")
    if not sourse_files:
        raise ValueError("No data available for analysis.")
    results = []
    if isinstance(sourse_files, str):
        sourse_files = [sourse_files]
    if isinstance(sourse_files, list):
        read_files_count = 0
        for file in sourse_files:
            try:
                results += get_results_from_file(file)
            except (FileNotFoundError, IsADirectoryError):
                print(f"File {file} was not found or is empty.")
            except json.JSONDecodeError:
                print(f"Error: {file} is not JSON-format.")
            except Exception as e:
                print(f"Error with file {file}: {e}")
            else:
                read_files_count += 1
        if read_files_count == len(sourse_files):
            print("All data has been received.")
    if not results:
        raise ValueError("No data for computing!")
    if save_all_battles:
        save_battles_to_json(newfilename, results)
    if not (style_control or len_control or no_control) and not (get_elo_ratings or get_mean_scores):
        raise ValueError("No option selectes.")
    if no_control:
        style_control, len_control = False, False
        get_elo_ratings = False
        get_mean_scores = True
    if style_control:
        len_control = False
    if len_control:
        get_elo_ratings = False
        get_mean_scores = True
        style_control = False
    scores = _calculate_ratings(
        results=results,
        model=model,
        style_control=style_control,
        len_control=len_control,
        pen_only_model=pen_only_model,
        get_mean_scores=get_mean_scores,
        get_elo_ratings=get_elo_ratings)
    if not get_elo_ratings:
        scores.pop("elo", None)
    if not get_mean_scores:
        scores.pop("mean_scores", None)
    if print_results:
        if get_elo_ratings:
            print('ELO RATINGS:')
            for m in scores['elo']:
                print(f"{str(m):40} | {scores['elo'][m]}")
            print()
        if get_mean_scores:
            print('MEAN SCORES:')
            for m in scores['mean_scores']:
                print(f"{str(m):40} | {scores['mean_scores'][m]}")
            print()
    if load_results:
        if not answerfile:
            answerfile = 'results.json'
        with open(answerfile, "w", encoding="utf-8") as file:
            json.dump(scores, file, ensure_ascii=False, indent=4)

In [95]:
get_ratings(
    sourse_files="previous_battles.json",
    model='',
    save_all_battles=True,
    newfilename='',
    answerfile='',
    load_results=True,
    print_results=True,
    style_control=True,
    len_control=False,
    no_control=True,
    pen_only_model=False,
    get_mean_scores=False,
    get_elo_ratings=True
)

All data has been received.
Data was stored to battles_saved_file.json.
MEAN SCORES:
Qwen2.5-32B-Instruct                     | 0.3587421789282791
RuadaptQwen2.5-32B-Pro-Beta              | 0.6633075218114936
RuadaptQwen2.5-7B-Lite-Beta              | 0.580000368883008
gigachat_max_26.20_uncen                 | 0.47070365701733685
gpt-4-1106-preview                       | 0.5577940168943933
gpt-4o-mini                              | 0.3917601471368512
llama3-70b                               | 0.5789097690632004
tlite                                    | 0.5443629814896822
tpro                                     | 0.7075199912293207
vikhr12b                                 | 0.49790500762108836
yagpt5lite                               | 0.14899435992534638

