In [None]:
from load import load_tasks_data, load_model_task_data
import numpy as np
from difficulty import get_difficulty_per_trial, quantize_difficulties, convert_difficulties_to_quantiles
from accuracy import get_accuracy_per_trial, get_accuracy_per_model, Split, normalize_accuracy
import json
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.svm import LinearSVC
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.metrics import mean_squared_error
from statsmodels.othermod.betareg import BetaModel
import matplotlib.pyplot as plt
from collections import defaultdict

In [None]:
tasks = load_tasks_data()
TASK = "boolq"
accuracy_per_model = get_accuracy_per_model(TASK)

In [None]:
accuracy_per_trial = get_accuracy_per_trial(accuracy_per_model)

In [None]:
instance_difficulties = get_difficulty_per_trial(accuracy_per_trial, exclude_models=[], num_options=tasks[TASK].get("num_options"))

In [None]:
train_models = ["openai_ada", "openai_curie", "openai_babbage", "ai21_j1-large", "together_gpt-j-6b", "together_gpt-neox-20b", "together_opt-66b", "microsoft_TNLGv2_7B" ]
test_models = ["openai_davinci", "ai21_j1-jumbo", "together_bloom", "together_opt-175b",  "microsoft_TNLGv2_530B", "together_yalm"]

In [None]:
with open("models.json") as f:
    param_counts = json.load(f)

In [None]:
def featurize_with_difficulty(instance_difficulties, accuracy_per_trial, param_counts, models):
    features = []
    labels = []
    for trial_id, difficulty in instance_difficulties.items():
        for model in models:
            features.append([difficulty, np.log(param_counts[model])])
            labels.append([record for record in accuracy_per_trial[trial_id] if record['model'] == model][0]["is_correct"])
    return features, labels

In [None]:
def get_task_level_accuracy_per_model(accuracy_per_model, models, num_options=None):
    accuracy_per_model = {model: sum([record["is_correct"] for record in accuracy_per_model[model]]) / len(accuracy_per_model[model])
        for model in models}
    if num_options is not None:
        accuracy_per_model = {model: normalize_accuracy(accuracy_per_model[model], num_options) for model in models}
    return accuracy_per_model

In [None]:
def featurize(accuracy_per_model, param_counts, models):
    mean_accuracy_per_model = get_task_level_accuracy_per_model(accuracy_per_model, models)
    features = []
    labels = []
    for model in models:
        features.append([np.log(param_counts[model])])
        labels.append(mean_accuracy_per_model[model])
    return features, labels

In [None]:
def predict_task_level_accuracy(clf, param_count, instance_difficulties, num_options=None):
    instance_level_predictions = clf.predict_proba(
            [[difficulty, np.log(param_count)]
            for difficulty in instance_difficulties.values()]
        )[:, 1]
    accuracy = np.mean(instance_level_predictions)
    if num_options is not None:
        accuracy = normalize_accuracy(accuracy, num_options)
    return accuracy

In [None]:
def get_task_level_predictions_instance(clf, models, param_counts, instance_difficulties, num_options=None):
    return {model: predict_task_level_accuracy(clf, param_counts[model], instance_difficulties, num_options) for model in models}

In [None]:
def fit_beta_model(features, labels):
    model = BetaModel(np.array(labels), np.array(features))
    r = model.fit()
    return model, r.params

def predict_beta_model(model, params, features):
    return model.predict(params, exog=features)

In [None]:


def group_instances_by_difficulty(instance_difficulties, num_bins=10):
    bin_edges = np.linspace(0, 1, num_bins)
    bins = np.digitize(np.array(list(instance_difficulties.values())), bin_edges)
    instance_ids_by_difficulty = defaultdict(list)
    for instance_id, bin in zip(instance_difficulties.keys(), bins):
        instance_ids_by_difficulty[bin].append(instance_id)
    return instance_ids_by_difficulty

def get_accuracy_per_difficulty(model, instance_ids_by_difficulty, accuracy_per_trial):
    accuracy_per_difficulty = {}
    for difficulty, instance_ids in instance_ids_by_difficulty.items():
        accuracy_per_difficulty[difficulty] = np.mean([record["is_correct"] for instance_id in instance_ids for record in accuracy_per_trial[instance_id] if record["model"] == model])
    return accuracy_per_difficulty


def predict_task_level_accuracy_with_binned_difficulties(model, params, param_count, instance_difficulties, num_options=None):
    binned_difficulties = group_instances_by_difficulty(instance_difficulties)
    instance_level_predictions = predict_beta_model(model, params,
            [[difficulty, np.log(param_count)]
            for difficulty in binned_difficulties.keys()]
        )
    accuracy = np.average(instance_level_predictions, weights=[len(ids) for ids in binned_difficulties.values()])
    if num_options is not None:
        accuracy = normalize_accuracy(accuracy, num_options)
    return accuracy

def get_task_level_predictions_binned(model, params, models, param_counts, instance_difficulties, num_options=None):
    return {model_name: predict_task_level_accuracy_with_binned_difficulties(model, params, param_counts[model_name], instance_difficulties, num_options) for model_name in models}

In [None]:
def featurize_with_binned_difficulty(instance_difficulties, accuracy_per_trial, param_counts, models):
    features = []
    labels = []
    instances_by_difficulty = group_instances_by_difficulty(instance_difficulties, 10)
    for model in models:
        accuracy_per_difficulty = get_accuracy_per_difficulty(model, instances_by_difficulty, accuracy_per_trial)
        for difficulty, accuracy in accuracy_per_difficulty.items():
            features.append([difficulty, np.log(param_counts[model])])
            accuracy = min(accuracy, 1 - 1e-6)
            accuracy = max(accuracy, 1e-6)
            labels.append(accuracy)
    return features, labels

In [None]:
def get_task_level_predictions(clf, models, param_counts):
    return {model: clf.predict([[np.log(param_counts[model])]])[0] for model in models}

In [None]:
def plot_predicted_accs(clf, models, param_counts, num_options=None):
    difficulties = np.linspace(0, 1, 100)
    for model in models:
        features = [[difficulty, np.log(param_counts[model])] for difficulty in difficulties ]
        predictions = clf.predict_proba(features)[:, 1]
        plt.plot(difficulties, predictions, label=model)
    plt.legend()

In [None]:
def score_difficulty_based_approach(instance_difficulties, accuracy_per_model, accuracy_per_trial, task):
    train_features, train_labels = featurize_with_difficulty(instance_difficulties, accuracy_per_trial, param_counts, train_models)
    test_features, test_labels = featurize_with_difficulty(instance_difficulties, accuracy_per_trial, param_counts, test_models)
    print(len(train_features))
    clf = LogisticRegression().fit(train_features, train_labels)
    acc = get_task_level_accuracy_per_model(accuracy_per_model, test_models)
    prd = get_task_level_predictions_instance(clf, test_models, param_counts, instance_difficulties, num_options=tasks[task].get("num_options"))
    return mean_squared_error(list(acc.values()), list(prd.values()))

def score_task_level_approach(accuracy_per_model):
    train_features, train_labels = featurize(accuracy_per_model, param_counts, train_models)
    test_features, test_labels = featurize(accuracy_per_model, param_counts, test_models)
    clf = LinearRegression().fit(train_features, train_labels)
    acc = get_task_level_accuracy_per_model(accuracy_per_model, test_models)
    prd = get_task_level_predictions(clf, test_models, param_counts)
    return mean_squared_error(list(acc.values()), list(prd.values()))

def score_quantized_difficulty_based_approach(instance_difficulties, accuracy_per_trial, task):
    difficulty_quantiles = convert_difficulties_to_quantiles(instance_difficulties)
    instance_difficulties = {instance_id: quantile for instance_id, quantile in zip(instance_difficulties.keys(), difficulty_quantiles)}
    train_features, train_labels = featurize_with_binned_difficulty(instance_difficulties, accuracy_per_trial, param_counts, train_models)
    test_features, test_labels = featurize_with_binned_difficulty(instance_difficulties, accuracy_per_trial, param_counts, test_models)
    model, params = fit_beta_model(train_features, train_labels)
    acc = get_task_level_accuracy_per_model(accuracy_per_model, test_models)
    prd = get_task_level_predictions_binned(model, params, test_models, param_counts, instance_difficulties, num_options=tasks[task].get("num_options"))
    return mean_squared_error(list(acc.values()), list(prd.values()))

def compare_approaches(task: str):
    accuracy_per_model = get_accuracy_per_model(task)
    accuracy_per_trial = get_accuracy_per_trial(accuracy_per_model)
    instance_difficulties = get_difficulty_per_trial(accuracy_per_trial, exclude_models=[], num_options=tasks[task].get("num_options"))
    difficulty_based_score = score_difficulty_based_approach(instance_difficulties, accuracy_per_model, accuracy_per_trial, task)
    task_level_score = score_task_level_approach(accuracy_per_model)
    binned_difficulty_score = score_quantized_difficulty_based_approach(instance_difficulties, accuracy_per_trial, task)
    return difficulty_based_score, task_level_score, binned_difficulty_score

In [None]:
compare_approaches("synthetic_reasoning_pattern_match")