In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import numpy as np
import pandas as pd
import tqdm
import typing

import theoretical_models

## Constructing (standardised) data frames

Each task's experimentor will have some way that they've saved the data. The DataFrame class below is a base class that each task can inherit from, and standardise the way that the data is stored. It also marries up the experimentor's recall investigations with the speed measuring results, which are computed separately.

In [111]:
class DataFrame:

    @staticmethod
    def _load_dataframe(file: str) -> pd.DataFrame:
        """Helper method to load file into data frame"""
        match file.split(".")[-1]:
            case "csv":
                return pd.read_csv(file)
            case "json":
                return pd.read_json(file)
            case "jsonl":
                return pd.read_json(file, lines=True)
            case _:
                raise ValueError("Don't know how to open filetype")


    @staticmethod
    def _save_dataframe(filename: str, df: pd.DataFrame) -> None:
        """Helper method to save data frame into file"""
        with open(filename, "w") as file:
            match filename.split(".")[-1]:
                case "csv":
                    file.write(df.to_csv())
                case "json":
                    file.write(df.to_json(orient="records"))
                case "jsonl":
                    file.write(df.to_json(orient="records", lines=True))
                case _:
                    raise ValueError("Invalid filetype")


    @staticmethod
    def _build_dataframe(
        cls: typing.Self, 
        files: typing.Union[str, list[str]], 
        feature_prefix: str, 
        features: list[str],
    ) -> pd.DataFrame:
        """Helper method to build a preprocessed data frame"""    
        df = pd.DataFrame()
        if isinstance(files, str):
            data = cls._load_dataframe(files)
        else:
            data = pd.concat([*map(cls._load_dataframe, files)])
        print(features)
        for feature in tqdm.tqdm(features, desc=f"{feature_prefix} features"):
            function = getattr(cls, f"{feature_prefix}_{feature}")
            df.loc[:, feature] = data.apply(function, axis=1)
        return df

    def __new__(
        cls: typing.Self, 
        recall_files: typing.Union[str, list[str]] = None,
        recall_features: list[str] = [],
        speed_files: typing.Union[str, list[str]]  = None,
        speed_features: list[str] = [],
        save_file: str = None,
    ) -> pd.DataFrame:
        """Instantiate a generic preprocessed data frame"""
        recall_features += ["k", "b", "k_mult", "k_b", "interleaved", "recall", "error"]
        if recall_files:
            print(recall_features)
            recall_df = cls._build_dataframe(cls, recall_files, "recall", recall_features)
        if speed_files:
            speed_features += ["n", "n_over_k", "batch_size", "k", "b", "k_mult", "k_b", "interleaved"]
            speed_features += ["method", "compile", "duration_mean", "duration_stdv"]
            speed_features += ["cost_basic", "cost_serial", "cost_parallel", "dtype", "n_inner"]
            speed_df = cls._build_dataframe(cls, speed_files, "speed", speed_features)
            if recall_files:
                merged_df = pd.merge(recall_df, speed_df)
            else:
                merged_df = speed_df
            if save_file:
                cls._save_dataframe(save_file, merged_df)
            return merged_df
        return recall_df, 
            

    def speed_n(row):
        return row["topk_size"]

    def speed_n_over_k(row):
        return round(row["topk_size"] / row["k"], 3)

    def speed_batch_size(row):
        return row["batch_size"]

    def speed_k(row):
        return row["k"]
    
    def speed_b(row):
        return (row["k"] // row["j"]) * row["k_mult"]

    def speed_k_mult(row):
        return row["k_mult"]
    
    def speed_k_b(row):
        return row["j"]

    def speed_interleaved(row):
        return row["args"].get("interleaved", True)
    
    def speed_method(row):
        return row["method"]

    def speed_compile(row):
        return row["compile"]

    def speed_duration_mean(row):
        return np.mean(row["duration"])

    def speed_duration_stdv(row):
        return np.std(row["duration"])

    def speed_cost_basic(row):
        return theoretical_models.cost_basic.approx_topk(
            k=row["k"], 
            n=row["topk_size"],
            m=row["batch_size"],
            b=(row["k"] // row["j"]) * row["k_mult"],
            k_b=row["j"],
        )

    def speed_cost_serial(row):
        return theoretical_models.cost_serial.approx_topk(
            k=row["k"], 
            n=row["topk_size"],
            m=row["batch_size"],
            b=(row["k"] // row["j"]) * row["k_mult"],
            k_b=row["j"],
        )

    def speed_cost_parallel(row):
        return theoretical_models.cost_parallel.approx_topk(
            k=row["k"], 
            n=row["topk_size"],
            m=row["batch_size"],
            b=(row["k"] // row["j"]) * row["k_mult"],
            k_b=row["j"],
        )

    def speed_dtype(row):
        return row["dtype"]

    def speed_n_inner(row):
        return row["n_inner"]

The task-specific data frame classes below describe the mapping from each experimentor's saved data to a standard approach. 

_Note: as benchmarking has all been run by Alberto, the speed data is the same across all tasks, and therefore is included in the parent DataFrame class._

In [112]:
class VocabDataFrame(DataFrame):

    def recall_k(row):
        return row["k"]
    
    def recall_b(row):
        return row["num_buckets"] * row["k_mult"]

    def recall_k_mult(row):
        return row["k_mult"]
    
    def recall_k_b(row):
        return row["k_per_bucket"]

    def recall_interleaved(row):
        return row["interleaved"]

    def recall_recall(row):
        k = row["k"]
        return row[f"recall_k{k}"]

    def recall_error(row):
        k = row["k"]
        return 1 - row[f"recall_k{k}"]



class GraphDataFrame(DataFrame):

    def recall_k(row):
        return row["K"]
    
    def recall_b(row):
        return row["n_buckets"]

    def recall_k_mult(row):
        return row["k_mult"]
    
    def recall_k_b(row):
        return row["J"]

    def recall_interleaved(_):
        return True

    def recall_recall(row):
        return row[f"recall_interleaved"]

    def recall_error(row):
        return 1 - row[f"recall_interleaved"]


class SparQDataFrame(DataFrame):
    
    def recall_k(row):
        return row["k"]
    
    def recall_b(row):
        return (row["k"] // row["topk_k_per_bucket"]) * row["topk_k_mult"]

    def recall_k_mult(row):
        return row["topk_k_mult"]
    
    def recall_k_b(row):
        return row["topk_k_per_bucket"]

    def recall_interleaved(row):
        return bool(row["topk_interleaved"])

    def recall_recall(_):
        return None

    def recall_error(_):
        return None

    def recall_score(row):
        return row["score"]

    def recall_n(row):
        return {"squad": 1409, "repetition": 1659}[row["task_name"]]

    def recall_exact_topk(row):
        return not row["bucket_topk"]



class SynthDataFrame(DataFrame):
    pass

In [110]:
# # exact_graph_df = GraphDataFrame(
# #     speed_files="../data/measure_speed_kgc_new.jsonl",
# # )
# # exact_graph_df = exact_graph_df[exact_graph_df.method == "approx_topk.torch_default.topk"]
# # exact_graph_df = exact_graph_df[exact_graph_df.compile.isin([None])]
# # exact_graph_df.loc[:, "recall"] = 1




# graph_df = GraphDataFrame(
#     save_file="../data/graph-data-merged.jsonl",
#     recall_files="../data/graph-recall-data.csv", 
#     speed_files="../data/measure_speed_kgc_multi2.jsonl",
# )

# # graph_df

In [109]:

# exact_graph_df = exact_graph_df[exact_graph_df.method == "approx_topk.torch_default.topk"]
# exact_graph_df = exact_graph_df[exact_graph_df.compile.isin([None])]
# exact_graph_df.loc[:, "recall"] = 1

# # GraphDataFrame._save_dataframe("../data/graph-data-exact-merged.jsonl", exact_graph_df)
# i = graph_df[graph_df.k_b == 4]
# i

In [None]:
vocab_df = VocabDataFrame(
    save_file="../data/vocab-data-merged.jsonl",
    recall_files="../data/vocab-recall-data.csv", 
    speed_files="../data/measure_speed_vocab_multi2.jsonl",
)
vocab_df

In [130]:
_dfs = []
for batch_size in tqdm.tqdm([*set(vocab_df.batch_size)], desc="BatchSize"):
    for b in tqdm.tqdm([*set(vocab_df.b)], desc="b"):
        for k_b in [*set(vocab_df.k_b)]:
            for k in [*set(vocab_df.k)]:
                _df = vocab_df[vocab_df.batch_size == batch_size]
                _df = _df[_df.b == b]
                _df = _df[_df.k_b == k_b]
                _df = _df[_df.k == k]
                if len(_df):
                    new_df = _df.iloc[:1]
                    new_df.loc[:, "recall"] = _df.recall.mean(),
                    new_df.loc[:, "error"] = _df.error.mean(),
                    _dfs += [new_df]

big_df = pd.concat(_dfs)
VocabDataFrame._save_dataframe("../data/vocab-data-merged-compressed.jsonl", big_df)


b: 100%|██████████| 16/16 [03:07<00:00, 11.72s/it]
b: 100%|██████████| 16/16 [03:06<00:00, 11.69s/it]17s/it]
b: 100%|██████████| 16/16 [03:04<00:00, 11.52s/it]88s/it]
b: 100%|██████████| 16/16 [03:04<00:00, 11.54s/it]57s/it]
b: 100%|██████████| 16/16 [03:04<00:00, 11.56s/it]04s/it]
BatchSize: 100%|██████████| 5/5 [15:31<00:00, 186.34s/it]


In [135]:
exact_vocab_df = VocabDataFrame(
    speed_files="../data/measure_speed_vocab_new.jsonl",
)
exact_vocab_df = exact_vocab_df[exact_vocab_df.method == "approx_topk.torch_default.topk"]
exact_vocab_df = exact_vocab_df[exact_vocab_df.compile.isin([None])]
exact_vocab_df.loc[:, "recall"] = 1

VocabDataFrame._save_dataframe("../data/vocab-data-exact-merged.jsonl", exact_vocab_df)

['n', 'n_over_k', 'batch_size', 'k', 'b', 'k_mult', 'k_b', 'interleaved', 'method', 'compile', 'duration_mean', 'duration_stdv', 'cost_basic', 'cost_serial', 'cost_parallel', 'dtype', 'n_inner', 'n', 'n_over_k', 'batch_size', 'k', 'b', 'k_mult', 'k_b', 'interleaved', 'method', 'compile', 'duration_mean', 'duration_stdv', 'cost_basic', 'cost_serial', 'cost_parallel', 'dtype', 'n_inner', 'n', 'n_over_k', 'batch_size', 'k', 'b', 'k_mult', 'k_b', 'interleaved', 'method', 'compile', 'duration_mean', 'duration_stdv', 'cost_basic', 'cost_serial', 'cost_parallel', 'dtype', 'n_inner', 'n', 'n_over_k', 'batch_size', 'k', 'b', 'k_mult', 'k_b', 'interleaved', 'method', 'compile', 'duration_mean', 'duration_stdv', 'cost_basic', 'cost_serial', 'cost_parallel', 'dtype', 'n_inner']


speed features: 100%|██████████| 68/68 [00:02<00:00, 24.05it/s]


# SparQ

In [83]:
repetition_df = SparQDataFrame(
    recall_files="../data/sparq_v1.jsonl",
    recall_features=["score", "n", "exact_topk"],
    speed_files="../data/measure_speed_sparq_multi2.jsonl",
)
repetition_df = repetition_df[repetition_df.n == 1659]


squad_df = SparQDataFrame(
    recall_files="../data/sparq_v2.jsonl",
    recall_features=["score", "n", "exact_topk"],
    speed_files="../data/measure_speed_sparq_multi2.jsonl",
)
squad_df = squad_df[squad_df.n == 1409]

merged_df = pd.concat([squad_df, repetition_df])


SparQDataFrame._save_dataframe("../data/sparq-data-merged.jsonl", merged_df)

recall features: 100%|██████████| 10/10 [00:00<00:00, 1032.88it/s]
speed features: 100%|██████████| 1054/1054 [00:00<00:00, 1396.53it/s]
recall features: 100%|██████████| 10/10 [00:00<00:00, 1683.85it/s]
speed features: 100%|██████████| 1071/1071 [00:00<00:00, 1414.33it/s]


In [86]:
dfs = []
for n, file in zip([1659, 1409], ["../data/sparq_v1.jsonl", "../data/sparq_v2.jsonl"]):

    # luka_df = pd.read_json(file, lines=True)
    # luka_df = luka_df[(luka_df.bucket_topk == False) & (luka_df.task_name == task_name)]
    luka_df = SparQDataFrame(recall_files=file, recall_features=["score", "n", "exact_topk"])[0]
    luka_df = luka_df[luka_df.n == n]
    luka_df = luka_df[luka_df.exact_topk == True]
    luka_df.loc[:, "b"] = 1
    luka_df.loc[:, "k_mult"] = 1
    luka_df.loc[:, "k_b"] = 96
    
    alberto_df = SparQDataFrame(speed_files="../data/measure_speed_sparq.jsonl")
    alberto_df = alberto_df[alberto_df.b == 1]
    alberto_df = alberto_df[alberto_df.n == n]

    
    
    dfs += [pd.merge(luka_df, alberto_df)]

all_df = pd.concat(dfs)

SparQDataFrame._save_dataframe("../data/sparq-data-exact-merged.jsonl", all_df)

all_df

# alberto_df = pd.read_json("../data/measure_speed_sparq.jsonl", lines=True)
# alberto_df = alberto_df[alberto_df.topk_size == 1659]
# print(alberto_df)

recall features: 100%|██████████| 10/10 [00:00<00:00, 1034.51it/s]
speed features: 100%|██████████| 1122/1122 [00:01<00:00, 997.15it/s] 
recall features: 100%|██████████| 10/10 [00:00<00:00, 1553.10it/s]
speed features: 100%|██████████| 1139/1139 [00:01<00:00, 984.34it/s]


Unnamed: 0,score,n,exact_topk,k,b,k_mult,k_b,interleaved,recall,error,...,batch_size,method,compile,duration_mean,duration_stdv,cost_basic,cost_serial,cost_parallel,dtype,n_inner
0,211.092,1659,True,96,1.0,1.0,96.0,True,,,...,1,approx_topk.torch_default.topk,,0.000486,7e-06,12583.452789,77615.307462,399.950603,float16,32
1,211.092,1659,True,96,1.0,1.0,96.0,True,,,...,1,approx_topk.torch_default.topk,default,0.000538,5e-06,12583.452789,77615.307462,399.950603,float16,32
0,0.8105,1409,True,96,1.0,1.0,96.0,True,,,...,1,approx_topk.torch_default.topk,,0.000473,4e-06,10687.212164,64591.129432,386.209569,float16,32
1,0.8105,1409,True,96,1.0,1.0,96.0,True,,,...,1,approx_topk.torch_default.topk,default,0.000521,4e-06,10687.212164,64591.129432,386.209569,float16,32


In [61]:
# repetition_df = SparQDataFrame(
#     recall_files="../data/sparq_v1.jsonl",
#     recall_features=["score", "n", "bucket_topk"],
#     speed_files="../data/measure_speed_sparq.jsonl",
# )
# repetition_df = repetition_df[repetition_df.n == 1659]


# squad_df = SparQDataFrame(
#     recall_files="../data/sparq_v2.jsonl",
#     recall_features=["score", "n", "bucket_topk"],
#     speed_files="../data/measure_speed_sparq.jsonl",
# )
# squad_df = squad_df[squad_df.n == 1409]

# print(repetition_df)
print([*set(repetition_df.bucket_topk)])

merged_df = pd.concat([squad_df, repetition_df])

merged_df = merged_df[merged_df.bucket_topk == False]

# SparQDataFrame._save_dataframe("../data/sparq-data-merged-exact.jsonl", merged_df)

recall features: 100%|██████████| 10/10 [00:00<00:00, 1032.75it/s]
speed features: 100%|██████████| 782/782 [00:00<00:00, 998.64it/s] 

[True]





In [26]:
def synth_recall(row):
    try:
        value = theoretical_models.recall.simulation(
            k=row["k"],
            n=row["n"],
            m=row["batch_size"],
            b=row["b"],
            k_b=row["k_b"],
            reps=5,
        )[0]
        print("Wow")
        return value
    except:
        return None


filename = "../data/synth-data-merged-v2.jsonl"

synth_df = SynthDataFrame(
    save_file=filename,
    speed_files=[
        # "../data/measure_speed_approx_finer.jsonl",
        "../data/measure_speed_exact_finer.jsonl",
        "../data/measure_speed_approx_finer_multi2.jsonl",
        "../data/measure_speed_approx_finer_max_k.jsonl",
    ],
)

print(len(synth_df))

# synth_df.loc[:, "recall_simulation"] = synth_df.apply(synth_recall, axis=1)

# synth_df

# with open(filename, "w") as file:
#     match filename.split(".")[-1]:
#         case "csv":
#             file.write(synth_df.to_csv())
#         case "json":
#             file.write(synth_df.to_json(orient="records"))
#         case "jsonl":
#             file.write(synth_df.to_json(orient="records", lines=True))



# df = pd.read_json("../data/synth-data-merged-v2.jsonl", lines=True)
# df = pd.read_json("../data/measure_speed_approx_finer.jsonl", lines=True)
# df = df[df.method == "approx_topk.bucket_argmax.topk_torch"]
# df = df[df.batch_size == 256]
# df = df[df.k == 256]

# df

speed features: 100%|██████████| 119/119 [00:02<00:00, 48.41it/s]

2436



