In [3]:
import pandas as pd
import numpy as np
from math import ceil
from typing import List, Tuple, Dict, Optional

In [4]:
df = pd.read_csv("data.csv", index_col=0)
df.tail()

Unnamed: 0,m1_0001,m1_0002,m1_0003,m1_0004,m1_0005,m1_0006,m1_0007,m1_0008,m1_0009,m1_0010,...,m1_1431,m1_1432,m1_1433,m1_1434,m1_1435,m1_1436,m1_1437,m1_1438,m1_1439,m1_1440
28,20.4946,21.947688,18.980332,21.228592,19.8051,20.879488,20.699952,17.79582,14.51408,12.34352,...,36.65368,21.7091,24.159072,25.510604,24.901772,17.027136,9.634464,6.302212,6.38232,8.345176
29,173.283012,160.870948,151.662308,138.247648,153.492276,147.209272,161.401408,144.81544,136.657136,152.239164,...,119.536396,80.533376,149.231292,146.60618,152.06702,147.2478,157.819648,170.120552,139.415808,138.99382
30,53.779012,54.77374,48.384392,39.384268,30.545284,42.29666,41.167784,47.725272,43.371468,29.093904,...,31.8493,20.235264,37.124892,29.908788,21.70462,26.871096,42.599844,40.83324,55.500592,54.3053
31,7.52864,4.775988,4.111296,3.214876,2.343404,6.64622,3.0191,4.129328,4.455248,2.295888,...,8.70814,4.53698,3.364172,4.013436,5.977384,4.663596,6.676152,5.798912,7.241248,9.090592
32,6.900628,4.594548,4.295564,3.189032,4.03508,4.543616,3.45968,3.32486,4.520684,3.918796,...,26.703768,4.234524,4.833808,3.721984,3.550232,5.1604,2.845752,4.53936,3.67248,3.271828


In [5]:
def compute_k_and_rss(x0, x1, x2, S_prime, subset):
    T = S_prime.shape[0]
    X_all = [x0, x1, x2]

    not_in_C = [i for i in (0, 1, 2) if i not in subset]
    y = S_prime.copy()
    for i in not_in_C:
        y -= X_all[i]
    
    X_c = [X_all[i] for i in subset]
    X = np.vstack(X_c).T

    try:
        x, residuals, rank, sing = np.linalg.lstsq(X, y, rcond=None)
        if residuals.size > 0:
            RSS = float(residuals[0])
        else:
            residuals_vec = y - X.dot(x)
            RSS = float(np.sum(residuals_vec**2))
        
        k_full = [1.0, 1.0, 1.0]
        for idx, i in enumerate(subset):
            k_full[i] = float(x[idx])
        
        return RSS
    except:
        return float('inf')

In [6]:
import numpy as np

def add_noise(S):
    S = np.asarray(S, dtype=float)
    noise = np.random.random(size=S.shape) * (0.1 * S)
    return S + noise

In [7]:
import itertools

candidates = list(itertools.combinations([0,1,2], 2))
candidates

[(0, 1), (0, 2), (1, 2)]

In [8]:
# Запуск на одной паре
x1 = df.iloc[0].to_numpy(dtype=float)
x2 = df.iloc[1].to_numpy(dtype=float)
x3 = df.iloc[2].to_numpy(dtype=float)

k1=0.7
k3=0.2
x1_ch = k1 * x1
x2_ch = x2.copy()
x3_ch = k3 * x3
S_prime = x1 + x2 + x3
S_prime = add_noise(S_prime)

print(candidates[0], compute_k_and_rss(x1_ch, x2_ch, x3_ch, S_prime, candidates[0]))
print(candidates[1], compute_k_and_rss(x1_ch, x2_ch, x3_ch, S_prime, candidates[1]))
print(candidates[2], compute_k_and_rss(x1_ch, x2_ch, x3_ch, S_prime, candidates[2]))

(0, 1) 55011.12787515695
(0, 2) 894.7600318075982
(1, 2) 19740.1501988669


In [None]:
def method_n_cheaters(x1, x2, x3, S_prime, n_cheaters):
    x1 = np.asarray(x1, dtype=float)
    x2 = np.asarray(x2, dtype=float)
    x3 = np.asarray(x3, dtype=float)

    x_list = [x1, x2, x3]
    T = S_prime.shape[0]

    candidates = list(itertools.combinations([0, 1, 2], n_cheaters))
    result = []
    for subset in candidates:
        RSS = compute_k_and_rss(x1, x2, x3, S_prime, subset)
        result.append({'subset': set(subset), 'RSS': RSS})
    
    result_sorted = sorted(result, key=lambda r: r['RSS'])
    best = result_sorted[0]

    return best['subset']

def method_unknow_cheaters(x1, x2, x3, S_prime):
    x1 = np.asarray(x1, float)
    x2 = np.asarray(x2, float)
    x3 = np.asarray(x3, float)
    
    S = np.asarray(S_prime, float)
    X = np.vstack([x1, x2, x3]).T

    k_hat, res, rank, s = np.linalg.lstsq(X, S, rcond=None)
    deltas = k_hat - 1.0
    tau=0.2

    inds = np.where(np.abs(deltas) > tau)[0]
    pred = set(inds.tolist()) if inds.size > 0 else None

    return pred

def method_unknown_cheaters_bootstrap(x1, x2, x3, S_prime):
    B = 300
    block_len = 48
    alpha = 0.05
    z = 2.0
    seed = None
    return_details = False
    rng = np.random.default_rng(seed)
    min_delta = 0.09
    direction = 'both'

    x1 = np.asarray(x1, dtype=float)
    x2 = np.asarray(x2, dtype=float)
    x3 = np.asarray(x3, dtype=float)
    S = np.asarray(S_prime, dtype=float)

    if not (x1.size == x2.size == x3.size == S.size):
        raise ValueError("Все входные вектора должны быть одинаковой длины")

    T = S.size
    X = np.vstack([x1, x2, x3]).T

    # OLS на оригинальных данных
    k_hat, residuals, rank, s = np.linalg.lstsq(X, S, rcond=None)
    RSS = float(residuals[0]) if residuals.size > 0 else float(np.sum((S - X.dot(k_hat))**2))

    # block length корректировки
    if block_len < 1:
        block_len = 1
    if block_len > T:
        block_len = T

    n_blocks = int(ceil(T / block_len))
    max_start = max(0, T - block_len)
    K_boot = np.zeros((B, 3))

    for b in range(B):
        if max_start > 0:
            starts = rng.integers(0, max_start + 1, size=n_blocks)
        else:
            starts = np.zeros(n_blocks, dtype=int)
        idx = []
        for st in starts:
            end = st + block_len
            if end <= T:
                idx.extend(range(st, end))
            else:
                idx.extend(range(st, T))
        idx = np.array(idx[:T], dtype=int)

        Xb = X[idx, :]
        Sb = S[idx]
        kb, _, _, _ = np.linalg.lstsq(Xb, Sb, rcond=None)
        K_boot[b, :] = kb

    se_boot = np.std(K_boot, axis=0, ddof=1)
    se_nonzero = np.where(se_boot == 0, 1e-12, se_boot)
    lower_q = 100.0 * (alpha / 2.0)
    upper_q = 100.0 * (1.0 - alpha / 2.0)
    ci_lower = np.percentile(K_boot, lower_q, axis=0)
    ci_upper = np.percentile(K_boot, upper_q, axis=0)
    t_stats = np.abs(k_hat - 1.0) / se_nonzero

    cheaters = []
    details = {
        'k_hat': k_hat, 'RSS': RSS, 'K_boot': K_boot,
        'se_boot': se_boot, 'ci_lower': ci_lower, 'ci_upper': ci_upper, 't_stats': t_stats,
        'alpha': alpha, 'z': z, 'block_len': block_len, 'B': B,
        'min_delta': min_delta, 'direction': direction
    }

    for i in range(3):
        delta = k_hat[i] - 1.0
        abs_delta = abs(delta)

        # практическая значимость
        practical_ok = abs_delta > min_delta

        # направленность
        if direction == 'less':
            direction_ok = (delta < -min_delta)
        elif direction == 'greater':
            direction_ok = (delta > min_delta)
        else:  # both
            direction_ok = practical_ok

        # статистическая значимость: CI не содержит 1 или t > z
        ci_excludes_1 = (ci_upper[i] < 1.0) or (ci_lower[i] > 1.0)
        stat_ok = ci_excludes_1 or (t_stats[i] > z)

        # итоговое правило: и статистически и практически значимо + направление
        if stat_ok and direction_ok:
            cheaters.append(i)
        
        pred = set(cheaters)

    if return_details:
        return cheaters, details
    return pred


In [None]:
# Запуск на одной паре
x0 = df.iloc[30].to_numpy(dtype=float)
x1 = df.iloc[6].to_numpy(dtype=float)
x2 = df.iloc[5].to_numpy(dtype=float)

k0=0.7
k2=0.2
x0_ch = k0 * x0
x1_ch = x1.copy()
x2_ch = k2 * x2
S_prime = x0 + x1 + x2
S_prime = add_noise(S_prime)

print(method_n_cheaters(x0_ch, x1_ch, x2_ch, S_prime, 2))
print(method_unknow_cheaters(x0_ch, x1_ch, x2_ch, S_prime))
print(method_unknown_cheaters_bootstrap(x0_ch, x1_ch, x2_ch, S_prime))

{0, 2}
{0, 2}
{0, 2}


In [None]:
# Перебор все возможных пар с заданным коэффицентом
from itertools import combinations

def evaluate_all_triples(df, k, n_cheaters):
    n_triples = df.shape[0]
    triples = list(combinations(range(n_triples), 3))

    records = []
    total_know = total_unknow = 0
    correct_know = correct_unknow = 0
    incorrect_know = incorrect_unknow = 0
    abstained_know = abstained_unknow = 0

    for (i, j, l) in triples:
        x0 = df.iloc[i].to_numpy(dtype=float)
        x1 = df.iloc[j].to_numpy(dtype=float)
        x2 = df.iloc[l].to_numpy(dtype=float)
        S_prime = x0 + x1 + x2
        S_prime = add_noise(S_prime)
        x_list = [x0, x1, x2]

        for true_subset in itertools.combinations([0, 1, 2], n_cheaters):
            true_set = set(true_subset)
            x_ch = [arr.copy() for arr in x_list]

            for pos in true_set:
                x_ch[pos] = k * x_ch[pos]
            

            pred_know = method_n_cheaters(x_ch[0], x_ch[1], x_ch[2], S_prime, n_cheaters)
            pred_unknow = method_unknow_cheaters(x_ch[0], x_ch[1], x_ch[2], S_prime)

            total_know += 1
            if pred_know is None:
                abstained_know += 1
                correct_flag = False
            else:
                pred_set = set(pred_know) if not isinstance(pred_know, set) else pred_know
                if pred_set == true_set:
                    correct_know += 1
                    correct_flag = True
                else:
                    incorrect_know += 1
                    correct_flag = False

            total_unknow += 1
            if pred_unknow is None:
                abstained_unknow += 1
                correct_flag = False
            else:
                pred_set = set(pred_unknow) if not isinstance(pred_unknow, set) else pred_unknow
                if pred_set == true_set:
                    correct_unknow += 1
                else:
                    incorrect_unknow += 1
                    correct_flag = False

            records.append({
                'triple': (i, j, l),
                'true_subset': true_set,
                'pred_know': pred_know,
                'pred_unknow': pred_unknow,
                'correct': correct_flag
            })

    df_res = pd.DataFrame.from_records(records)

    metrics_know = {
        'method': 'know',
        'total': total_know,
        'correct': correct_know,
        'incorrect': incorrect_know,
        'abstained': abstained_know,
        'accuracy': correct_know / total_know if total_know > 0 else 0.0,
        'false_positive_rate': incorrect_know / total_know if total_know > 0 else 0.0
    }

    metrics_unknow = {
        'method': 'unknow',
        'total': total_unknow,
        'correct': correct_unknow,
        'incorrect': incorrect_unknow,
        'abstained': abstained_unknow,
        'accuracy': correct_unknow / total_unknow if total_unknow > 0 else 0.0,
        'false_positive_rate': incorrect_unknow / total_unknow if total_unknow > 0 else 0.0
    }

    print('--- Результаты:')
    print('Параметры: k =', k, '| total checks =', total_know)
    print('Know: Accuracy =', metrics_know['accuracy'], '| False positive rate =', metrics_know['false_positive_rate'])
    print('Unknow: Accuracy =', metrics_unknow['accuracy'], '| False positive rate =', metrics_unknow['false_positive_rate'])

    return df_res, metrics_know, metrics_unknow

In [29]:
df_res, metrics_know, metrics_unknow = evaluate_all_triples(df, 0.2, 2)

--- Результаты:
Параметры: k = 0.2 | total checks = 16368
Know: Accuracy = 0.9292521994134897 | False positive rate = 0.07074780058651027
Unknow: Accuracy = 0.9407380254154448 | False positive rate = 0.05926197458455523
