# Introduction


To understand what we are doing here, let's take a look at this image below:


![alt text](https://github.com/PangeAI/simms/blob/main/assets/cosine-batch-layout-grid.jpg?raw=true)


We have 1.5 million references and 100k arrays and want a stupidly large matrix of scores, with 1.5 million rows and 100k columns, where each matrix entry is a result of pairwise GreedyCosine. All entries are independent and can be computed in parallel. Even with high-CPU count (my machine has 8 CPU, estimate it takes 200 hours

GPUs are fundamentally a large 2D grid of very small CPUs. There are several ways of making our problem "fit" to the environment of GPUs, and I have chosen the following layout as shown above.

GPU can processes a single batch at a time - per-batch processing speed is near-instatanous, regardless of batch size, as long as the batch can fit into memory.

So - every batch is a 2D grid of references and queries that will be compared pairwise by different threads. If we zoom into the batch#0, we see:


![alt text](https://github.com/PangeAI/simms/blob/main/assets/cosine-batch-layout-batch.jpg?raw=true "Title")


Meaning that a GPU has a separate small CPU (thread) for every pair in the cartesian product of references and queries in that batch. We see that every thread takes in it's own reference and query and returns three values:
score (float), num_matches (int, but casted to float), overflow (bool).

If we further zoom into the first thread, we see this pseudo-code being executed:


![alt text](https://github.com/PangeAI/simms/blob/main/assets/cosine-batch-layout-thread.jpg?raw=true "Title")

This code is what is called a CUDA kernel - and it is exactly the same for every single thread in all batches. What changes is the input data (per batch) and which reference and query we work with (per thread).

The algorithm has two parts.

First loop collects all possible mzmz pairs (up to MATCH_LIMIT size), and report an overflow if it happens.

Second loop is essentially a bubble sort. Since "sorted()" isn't available to CUDA threads, we have to manually loop over the matches (nested loop) and, while we have left over scores:
- Get largest score
- Discard all other scores that have same index
- We normalize the score

## Setup
Download Data, Define Kernel, Data IO, Utilities

In [1]:
try:
    import google.colab

    print("Running in colab. Installing required libraries.")
    ! pip install -qq matchms numba joblib
    ! mkdir -p data/input
    ! cd data/input && wget -q https://storage.googleapis.com/PangeAI-tmp-io/example_dataset_tornike.csv
except ImportError:
    print("Not running in colab. Skipping install.")

Not running in colab. Skipping install.


### Kernel

In [2]:
%load_ext autoreload
%autoreload 2

import math
import numba
from numba import cuda, types
from numba.cuda.cudadrv.devicearray import DeviceNDArray


def compile(
    tolerance: float = 0.1,
    shift: float = 0,
    mz_power: float = 0.0,
    int_power: float = 1.0,
    match_limit: int = 128,
    batch_size: int = 4096,
) -> callable:
    """
    JIT compiles the kernel for CUDA device, and bakes in constants (tolerance, shift, etc.)
    Returns a callable that takes in arguments:
        rspec_cu:
            DeviceNDArray, [2, R, M] float32
        qspec_cu:
            DeviceNDArray, [2, Q, N] float32
        lens_cu: DeviceNDArray, [2, max(R,Q)] int32
            The "2" in front is because these is mz and int stacked on top of each other
        out_cu: DeviceNDArray, [R,Q,2] float32
            Contains both score (0) and counts (1) for each RQ pair
        overflow_cu: DeviceNDArray, [R,Q,1] uint8
            Contains 1 - if overflow happened at RQ
        stream: cuda.stream
            Necessary to keep GPU as busy as possible.

    This callable will run JIT-ed cuda kernel. All arguments must already reside in GPU memory.
    First-time use will cause the kernel "warm-up", so subsequent runs will be much faster.
    """
    assert cuda.detect(), "Cuda seems to be unavailable"
    MATCH_LIMIT = match_limit
    R, Q = batch_size, batch_size
    THREADS_PER_BLOCK = (32, 32)
    BLOCKS_PER_GRID_X = math.ceil(R / THREADS_PER_BLOCK[0])
    BLOCKS_PER_GRID_Y = math.ceil(Q / THREADS_PER_BLOCK[1])
    BLOCKS_PER_GRID = (BLOCKS_PER_GRID_X, BLOCKS_PER_GRID_Y)

    @cuda.jit
    def _kernel(
        rspec: DeviceNDArray,
        qspec: DeviceNDArray,
        lens: DeviceNDArray,
        out: DeviceNDArray,
        overflow: DeviceNDArray,
    ):
        i, j = cuda.grid(2)
        thread_i = cuda.threadIdx.x
        thread_j = cuda.threadIdx.y
        block_size_x = cuda.blockDim.x
        block_size_y = cuda.blockDim.y

        # mem = cuda.shared.array((8, ))
        # We aren't out of the RxQ grid
        if i < R and j < Q:
            # Init values (we expect these to be uninitialized)
            overflow[i, j] = 0
            out[i, j] = 0

            # mem = cuda.shared.array((4, 4, 4, 32), types.float32)
            rmz = rspec[0]
            rint = rspec[1]
            qmz = qspec[0]
            qint = qspec[1]
            # In this i,j, We get length of r and q spectrums
            # since they are batched, there might be extra filler elements
            rlen = lens[0]
            qlen = lens[1]

            rleni = rlen[i]
            qlenj = qlen[j]

            # When we have batch that is incomplete (size is indivisible by B)
            # we return quickly to avoid writing garbage there.
            if rleni == 0 or qlenj == 0:
                return

            spec1_mz = rmz[i]
            spec1_int = rint[i]

            spec2_mz = qmz[j]
            spec2_int = qint[j]

            lowest_idx = types.int32(0)
            num_match = types.int32(0)

            matches = cuda.local.array((2, MATCH_LIMIT), types.int16)
            for peak1_idx in range(rleni):
                mz = spec1_mz[peak1_idx]

                low_bound = mz - tolerance
                high_bound = mz + tolerance

                for peak2_idx in range(lowest_idx, qlenj):
                    mz2 = spec2_mz[peak2_idx] + shift
                    if mz2 > high_bound:
                        break
                    if mz2 < low_bound:
                        lowest_idx = peak2_idx
                    else:
                        if num_match < MATCH_LIMIT:
                            matches[0, num_match] = peak1_idx
                            matches[1, num_match] = peak2_idx
                            num_match += 1
                        else:
                            overflow[i, j, 0] = 1  # This is the errorcode for overflow
                            break

            if num_match == 0:
                return

            # SLOW, calculate norm ( This should be done in several threads )
            # score_norm = types.float32(0.0)
            score_norm = types.float32(1.0)
            score_norm_spec1 = types.float32(0.0)
            score_norm_spec2 = types.float32(0.0)

            for peak1_idx in range(rleni):
                score_norm_spec1 += (
                    (spec1_mz[peak1_idx] ** mz_power)
                    * (spec1_int[peak1_idx] ** int_power)
                ) ** 2
            for peak2_idx in range(qlenj):
                score_norm_spec2 += (
                    (spec2_mz[peak2_idx] ** mz_power)
                    * (spec2_int[peak2_idx] ** int_power)
                ) ** 2
            score_norm = math.sqrt(score_norm_spec1 * score_norm_spec2)

            # Quite slow - Bubble sort (This should also be done in several threads)
            # We need two cases, bubble sort up to 50 elems is fine
            score = types.float32(0.0)
            used_matches = types.int32(0)
            for _ in range(0, num_match):
                max_prod = types.float64(-1.0)
                max_peak1_idx = -1
                max_peak2_idx = -1

                for sj in range(0, num_match):
                    if matches[0, sj] >= 0:
                        peak1_idx = matches[0, sj]
                        peak2_idx = matches[1, sj]

                        power_prod_spec1 = (spec1_mz[peak1_idx] ** mz_power) * (
                            spec1_int[peak1_idx] ** int_power
                        )
                        power_prod_spec2 = (spec2_mz[peak2_idx] ** mz_power) * (
                            spec2_int[peak2_idx] ** int_power
                        )
                        prod = power_prod_spec1 * power_prod_spec2
                        if prod > max_prod:
                            max_prod = prod
                            max_peak1_idx = peak1_idx
                            max_peak2_idx = peak2_idx

                if max_prod > 0:
                    for sj in range(0, num_match):
                        if (
                            matches[0, sj] == max_peak1_idx
                            or matches[1, sj] == max_peak2_idx
                        ):
                            matches[0, sj] = -1  # "Remove" it
                            matches[1, sj] = -1  # "Remove" it
                    score += max_prod
                    used_matches += 1

                if max_prod < 0:
                    break

            score = score / score_norm

            out[i, j, 0] = score
            out[i, j, 1] = used_matches

    def kernel(
        rspec_cu: DeviceNDArray,
        qspec_cu: DeviceNDArray,
        lens_cu: DeviceNDArray,
        out_cu: DeviceNDArray,
        overflow_cu: DeviceNDArray,
        stream: cuda.stream,
    ):
        _kernel[BLOCKS_PER_GRID, THREADS_PER_BLOCK, stream](
            rspec_cu,
            qspec_cu,
            lens_cu,
            out_cu,
            overflow_cu,
        )

    return kernel

### Data IO

In [3]:
import json
from pathlib import Path
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from matchms import Spectrum
from matchms.filtering import (
    add_losses,
    normalize_intensities,
    reduce_to_number_of_peaks,
    require_minimum_number_of_peaks,
    select_by_mz,
    select_by_relative_intensity,
)
from tqdm import tqdm


def spectra_peaks_to_tensor(
    spectra: list, dtype: str = "float32"
) -> tuple[np.ndarray, np.ndarray]:
    """
    Working with GPU requires us to have a fixed shape for mz/int arrays.
    This isn't the case for real-life data, so we have to "pad" the mz/int arrays.
    We keep the real size of the mz/int in separate array, "batch". The regions out
    of what "batch" specifies is undefined.

    Returns:
        spectra: [2, len(spectra)] float32
        batch: [len(spectra)] int32
    """
    sp_max_shape = max(len(s.peaks) for s in spectra)
    mz = np.empty((len(spectra), sp_max_shape), dtype=dtype)
    int = np.empty((len(spectra), sp_max_shape), dtype=dtype)
    batch = np.empty(len(spectra), dtype=np.int32)
    for i, s in enumerate(spectra):
        # .to_numpy creates an unneeded copy - we don't need to do that twice
        mz[i, : len(s.peaks)] = s._peaks.mz
        int[i, : len(s.peaks)] = s._peaks.intensities
        batch[i] = len(s.peaks)
    spec = np.stack([mz, int], axis=0)
    return spec, batch


def get_ref_spectra_from_df(spectra_df, limit=None) -> pd.DataFrame:
    """
    This function will take a dataframe with spectra and return a list of matchms spectra.
    Since all rows are independent, this function does this preprocessing in parallel (CPU).

    """

    # for index, row in spectra_df.iterrows():
    def fn(index, row):
        pbid = row["pbid"]
        precursor_mz = row["precursor_mz"]
        smiles = row["pb_smiles"]
        inchikey = row["pb_inchikey"]
        mz_array = np.array(json.loads(row["peaks_mz"]))
        intensity_array = np.array(json.loads(row["peaks_intensities"]))
        sp = Spectrum(
            mz=mz_array,
            intensities=intensity_array,
            metadata={
                "id": pbid,
                "precursor_mz": precursor_mz,
                "smiles": smiles,
                "inchikey": inchikey,
            },
        )
        sp = process_spectrum(sp)
        return sp

    if limit is not None:
        spectra_df = spectra_df.head(limit)
    spectra = Parallel(-2)(
        delayed(fn)(index, row)
        for index, row in tqdm(spectra_df.iterrows(), total=len(spectra_df))
    )
    spectra = [s for s in spectra if s is not None]
    return spectra


def process_spectrum(spectrum: np.ndarray) -> np.ndarray:
    spectrum = select_by_mz(spectrum, mz_from=10.0, mz_to=1000.0)
    spectrum = normalize_intensities(spectrum)
    spectrum = select_by_relative_intensity(spectrum, intensity_from=0.001)
    spectrum = reduce_to_number_of_peaks(spectrum, n_max=1000)
    spectrum = require_minimum_number_of_peaks(spectrum, n_required=5)
    return spectrum


def batches(lst, batch_size):
    """
    Batch data from the iterable into tuples of length n. The last batch may be shorter than n.
    """
    for i in range(0, len(lst), batch_size):
        yield lst[i : i + batch_size]


def mkdir(p: Path) -> Path:
    p = Path(p)
    p.mkdir(exist_ok=True)
    return p


def argbatch(lst, batch_size):
    """
    Batch data from the iterable into tuples of start-end indices
    """
    for i in range(0, len(lst), batch_size):
        yield i, i + batch_size

## Main Part

## 

In [4]:
import math
import re
from itertools import product
from multiprocessing import shared_memory
from multiprocessing.pool import ThreadPool
from pathlib import Path
from time import perf_counter
import numpy as np
import pandas as pd
from numba import cuda
from tqdm import tqdm

In [5]:
## Define constants
tolerance: float = 0.1
shift: float = 0
mz_power: float = 0
int_power: float = 1

## How many pairs per batch. Has to be a power of 2.
# Hardware specific - An RTX2070 works best at around 1024 * 2
# But Colab T4 GPU might work best at 1024 * 4
BATCH_SIZE = 1024

# MATCH_LIMIT specifies max how many mz-mz pairs we could consider for each RQ pair, before we sort and filter.
# E.g. a value of 256 usually causes around ~0.003% of RQ pairs to "overflow".
# The overflown RQ scores will be strictly less than or equal to perfectly accurate score.
# The mean absolute difference at 256, for all overflown pairs is on the order of ~1e-3
# Small values of MATCH_LIMIT (e.g. 128, 64,) cause a dramatic speedup in the processing speed.
MATCH_LIMIT = 256

## GPU-specific constants
THREADS_PER_BLOCK = (32, 32)
BLOCKS_PER_GRID_X = math.ceil(BATCH_SIZE / THREADS_PER_BLOCK[0])
BLOCKS_PER_GRID_Y = math.ceil(BATCH_SIZE / THREADS_PER_BLOCK[1])
BLOCKS_PER_GRID = (BLOCKS_PER_GRID_X, BLOCKS_PER_GRID_Y)

# Since Greedy cosine is an unstable algorithm, because approximate mz-mz values do not
# result in approximately the same scores and number of matches.
# So we need to use fp64 to minimize the deviation as much as possible.
# Using float32 causes a significant speedup in the processing speed.
dtype = "float64"

# Data path
reference_csv_file = "data/input/example_dataset_tornike.csv"
query_csv_file = "data/input/example_dataset_tornike.csv"
output_dir = "data/output/"

# Limits
# We consider only first LIMIT number of entries in CSVs
LIMIT = 2048

# For keeping track of experiments
CONFIG = dict(
    tolerance=tolerance,
    shift=shift,
    mz_power=mz_power,
    int_power=int_power,
    match_limit=MATCH_LIMIT,
    batch_size=BATCH_SIZE,
    limit=LIMIT,
)

In [6]:
# We load CSV files using multiple threads
ref_spectra_df_path = Path(reference_csv_file)
ref_spectra_df = pd.read_csv(ref_spectra_df_path)
references = get_ref_spectra_from_df(ref_spectra_df, limit=LIMIT)

query_spectra_df_path = Path(query_csv_file)
query_spectra_df = pd.read_csv(query_spectra_df_path)
queries = get_ref_spectra_from_df(query_spectra_df, limit=LIMIT)

print(f"We have {len(ref_spectra_df)} references and {len(query_spectra_df)} queries")

100%|██████████| 2048/2048 [00:02<00:00, 691.79it/s] 
100%|██████████| 2048/2048 [00:00<00:00, 4988.86it/s]


We have 100001 references and 100001 queries


In [7]:
# Numba Just-in-time compiles our kernel and bakes in our constants for performance.
kernel = compile(
    tolerance=tolerance,
    shift=shift,
    mz_power=mz_power,
    int_power=int_power,
    match_limit=MATCH_LIMIT,
    batch_size=BATCH_SIZE,
)

Found 1 CUDA devices
id 0    b'NVIDIA GeForce RTX 2070 with Max-Q Design'                              [SUPPORTED]
                      Compute Capability: 7.5
                           PCI Device ID: 0
                              PCI Bus ID: 1
                                    UUID: GPU-f6e241c8-f0ad-720e-be22-2713a6b0868d
                                Watchdog: Enabled
             FP32/FP64 Performance Ratio: 32
Summary:
	1/1 devices are supported


In [8]:
output_dir = mkdir(output_dir)

TOTAL_BATCHES_X = math.ceil(len(references) / BATCH_SIZE)
TOTAL_BATCHES_Y = math.ceil(len(queries) / BATCH_SIZE)
TOTAL_BATCHES = TOTAL_BATCHES_X * TOTAL_BATCHES_Y
print("Total batches: ", TOTAL_BATCHES)
print(
    f"Total pairs considered: {len(references)} * {len(queries)} = {len(references) * len(queries)}"
)

if len(references) % BATCH_SIZE != 0:
    print(
        f"Since {len(references)} isn't divisible by BATCH_SIZE, last batch will have {len(references) % BATCH_SIZE} empty ROWS at the end"
    )
if len(queries) % BATCH_SIZE != 0:
    print(
        f"Since {len(queries)} isn't divisible by BATCH_SIZE, last batch will have {len(queries) % BATCH_SIZE} empty COLUMNS at the end"
    )

Total batches:  4
Total pairs considered: 1993 * 1993 = 3972049
Since 1993 isn't divisible by BATCH_SIZE, last batch will have 969 empty ROWS at the end
Since 1993 isn't divisible by BATCH_SIZE, last batch will have 969 empty COLUMNS at the end


In [9]:
# Load each batch in memory so that we don't have to load any R,Q twice
batches_r = []
for rbatch in tqdm(batches(references, BATCH_SIZE), desc="Batch all references"):
    rspec, rlen = spectra_peaks_to_tensor(rbatch, dtype=dtype)
    batches_r.append([rspec, rlen])

batches_q = list()
for qbatch in tqdm(batches(queries, BATCH_SIZE), desc="Batch all queries"):
    qspec, qlen = spectra_peaks_to_tensor(qbatch, dtype=dtype)
    batches_q.append([qspec, qlen])

Batch all references: 0it [00:00, ?it/s]

Batch all references: 2it [00:00, 20.05it/s]
Batch all queries: 2it [00:00, 21.47it/s]


# Main loop

In [10]:
streams = [cuda.stream() for _ in range(TOTAL_BATCHES)]

batches_r = []
for bstart, bend in tqdm(argbatch(references, BATCH_SIZE), desc="Batch all references"):
    rbatch = references[bstart:bend]
    rspec, rlen = spectra_peaks_to_tensor(rbatch, dtype=dtype)
    batches_r.append([rspec, rlen, bstart, bend])

batches_q = list()
for bstart, bend in tqdm(argbatch(queries, BATCH_SIZE), desc="Batch all queries"):
    qbatch = queries[bstart:bend]
    qspec, qlen = spectra_peaks_to_tensor(qbatch, dtype=dtype)
    batches_q.append([qspec, qlen, bstart, bend])

batches_rq = list(product(batches_r, batches_q))

Batch all references: 2it [00:00, 29.69it/s]
Batch all queries: 2it [00:00, 28.42it/s]


In [11]:
! rm -rf data/output/*

start = perf_counter()
# We initialize a pool of 3 workers that will offload results to disk
with ThreadPool(3) as pool:
    # We loop over all batchs in sequence
    for batch_i in tqdm(range(TOTAL_BATCHES)):

        # Each batch has own CUDA stream so that the GPU is as busy as possible
        stream = streams[batch_i]

        # Shared memory allows pool workers to read array without copying it
        out_shm = shared_memory.SharedMemory(
            create=True, size=(BATCH_SIZE * BATCH_SIZE * 2 * 4)
        )
        out = np.ndarray(
            shape=(BATCH_SIZE, BATCH_SIZE, 2), dtype="float32", buffer=out_shm.buf
        )
        overflow_shm = shared_memory.SharedMemory(
            create=True, size=(BATCH_SIZE * BATCH_SIZE * 1 * 1)
        )
        overflow = np.ndarray(
            shape=(BATCH_SIZE, BATCH_SIZE, 1), dtype="uint8", buffer=overflow_shm.buf
        )

        # We order empty space for results on GPU RAM
        out_cu = cuda.device_array(
            (BATCH_SIZE, BATCH_SIZE, 2), dtype="float32", stream=stream
        )
        overflow_cu = cuda.device_array(
            (BATCH_SIZE, BATCH_SIZE, 1), dtype="uint8", stream=stream
        )

        # We get our batch and lengths (lengths are different for different spectra)
        (rspec, rlen, rstart, rend), (qspec, qlen, qstart, qend) = batches_rq[batch_i]
        lens = np.zeros((2, BATCH_SIZE), "int32")
        lens[0, : len(rlen)] = rlen
        lens[1, : len(qlen)] = qlen

        # We make sure main resources remain on CPU RAM
        with cuda.pinned(
            rspec,
            qspec,
            lens,
            out,
            overflow,
        ):

            # We order the stream to copy input data to GPU RAM
            rspec_cu = cuda.to_device(rspec, stream=stream)
            qspec_cu = cuda.to_device(qspec, stream=stream)
            lens_cu = cuda.to_device(lens, stream=stream)

            # We order the stream to execute kernel (this is scheduled, it will execute, but we can't force it)
            kernel(rspec_cu, qspec_cu, lens_cu, out_cu, overflow_cu, stream=stream)

            # We order a data return
            out_cu.copy_to_host(out, stream=stream)
            overflow_cu.copy_to_host(overflow, stream=stream)

            # We create a function that will execute when this stream is done working
            # It is important to be quick here - so main work of writing to disk
            # Is handled by pool workers, not callback stream.
            def end_of_stream_callback(*args):
                def thread_worker(name1, name2):
                    ex_shm = shared_memory.SharedMemory(name=name1)
                    out = np.ndarray(
                        shape=(BATCH_SIZE, BATCH_SIZE, 2),
                        dtype=np.float32,
                        buffer=ex_shm.buf,
                    )
                    np.save(
                        f"data/output/{rstart}-{rend}.{qstart}-{qend}.score.npy", out
                    )

                    ex_shm.unlink()
                    ex_shm = shared_memory.SharedMemory(name=name2)
                    overflow = np.ndarray(
                        shape=(BATCH_SIZE, BATCH_SIZE, 1),
                        dtype=np.uint8,
                        buffer=ex_shm.buf,
                    )
                    np.save(
                        f"data/output/{rstart}-{rend}.{qstart}-{qend}.ovfl.npy",
                        overflow,
                    )
                    ex_shm.unlink()

                pool.apply_async(
                    thread_worker,
                    args=[out_shm.name, overflow_shm.name],
                    error_callback=lambda e: print("Thread error", e),
                )

            stream.add_callback(
                callback=end_of_stream_callback,
            )

# We wait for all streams to finish their work everywhere
cuda.synchronize()

# We can now calculate our performance fairly
duration = perf_counter() - start
persec = len(references) * len(queries) / duration
print(f"Speed at {persec:.1f} pairs/sec")
print(f"Estimated {(100_000 * 1_500_000 / persec) / 3600:.2f}hrs per 100k x 1.5mln")

100%|██████████| 4/4 [00:01<00:00,  2.60it/s]

Speed at 2574696.6 pairs/sec
Estimated 16.18hrs per 100k x 1.5mln





# Filtering and further processing

## Examples

### Query RQ pairs with condition on score

This is still TODO on large outputs, since filtering gigabytes worth of numpy arrays will take forever. For now, CPU implementation should suffice - or we could integrate this "filtering" behaviour directly into Kernel.

In [12]:
min_score = 0.75  # Min score
results = pd.DataFrame([], columns=["Reference", "Query", "Score", "Num_Matches"])

score_files = sorted(Path(output_dir).glob("*.score.npy"))
for score_file in score_files:
    print(score_file.stem)
    match = re.match(r"(\d+)-(\d+)\.(\d+)-(\d+)", score_file.stem)
    rstart, rend, qstart, qend = map(int, match.groups())
    score = np.load(score_file)

    # Condition query
    pairs_relative = np.argwhere(score[..., 0] >= min_score)
    # We have to pad pairs with their actual locations on full grid
    pairs_absolute = pairs_relative + [rstart, qstart]

    # score, num_matches = get_one_specific(ref_idx, que_idx)
    r, q = pairs_relative.T
    score, num_match = score[r, q].T

    r, q = pairs_absolute.T
    result = pd.DataFrame(
        dict(
            Reference=r.astype("uint32"),
            Query=q.astype("uint32"),
            Score=score.astype("float32"),
            Num_Matches=num_match.astype("uint16"),
        )
    ).convert_dtypes()
    results = pd.concat([results, result], axis=0, copy=False)

print(results.dtypes, "Memory ", results.memory_usage().sum() / 1e6, "MB")

assert (result.Score >= min_score).all(), "Something wrong with filtering!"

results

0-1024.0-1024.score
0-1024.1024-2048.score
1024-2048.0-1024.score
1024-2048.1024-2048.score
Reference       UInt32
Query           UInt32
Score          Float32
Num_Matches     UInt16
dtype: object Memory  1.079728 MB


Unnamed: 0,Reference,Query,Score,Num_Matches
0,0,0,1.0,14
1,0,1,0.990495,14
2,0,2,0.977393,11
3,0,3,0.934253,11
4,0,4,0.877143,11
...,...,...,...,...
15022,1992,1273,0.8502,39
15023,1992,1609,0.761673,33
15024,1992,1990,0.802216,37
15025,1992,1991,0.945183,45


Be careful with how large the `results` dataframe can get! You might run our of RAM before it's all loaded into memory