# Compaction Project

## Problem Formulation

Given a set of chunks whose sizes are

$$
S_n = \{d_1, \cdots, d_n\},
$$

where the positive integer $d_i \leq 2048$ for all $i = 1, \cdots, n$. Suppose a pipeline containing $k$ join operators need time $\mathcal{F}_k(d_i)$ to process a data chunk with the size $d_i$. 

Our goal is to compact the set $S$, i.e., we need a transformation

$$
\mathcal{M}: S_n \rightarrow S'_m \triangleq \{d'_1,  \cdots, d'_m\},
$$

where $\sum_i^n d_i = \sum_j^m d'_j$ and $m$ is an arbitrary integer less than $n$, to minimize 

$$
\sum_j^m \mathcal{F}(d'_j) + cost(M, S).
$$

where $cost(\mathcal{M}, S)$ is the cost of the transformation $\mathcal{M}$ on the set $S$. The cost of combining two or more chunks into one: $d_i + \cdots + d_j = d'_s \leq 2048$, is 

$$
g(d'_s) = C_3 + d'_s \times C_4.
$$

**Note:** This formulated problem is easier than the real compaction problem because we know sizes of all chunks in advance, as opposed to dealing with a stream of chunks.

## Cost Calculation

Suppose the $i$-th join operator needs time $f_i(d) = C_1 + C_2 \cdot d$ to process a data chunk with the size $d$. 

Suppose the pipeline has $k$ join operators, we have two assumptions:

**Assumption 1**: Each join receives a data chunk, and outputs $m$ smaller chunks. 

**Assumption 2**: The total number of tuples across the $m$ smaller chunks remains the same as the number of tuples in the original input chunk. 

Then, for an input chunk with the size $d$, the needed execution time $\mathcal{F}_k (d)$ consists of two parts: 

1. Per Tuple Cost 

$$ C_2 \cdot d \cdot k $$ 

2. Fixed Cost

$$ C_1 \cdot \min\{m^0, d\} + C_1 \cdot \min\{m^1, d\} + \cdots + C_1 \cdot \min\{m^{k-1}, d\}$$

We need to take the minimum value betwwen $m^{i}$ and the $d$ because a data chunk cannot be split into more than $d$ smaller chunks. In other words, each chunk has at least one tuple.

In [None]:
# import
from termcolor import colored
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go

# settings
%matplotlib inline

In [None]:
# utils
def print_color(text, color=None):
    if color:
        print(colored(text, color))
    else:
        print(text)

## Query Execution Simulation

In [None]:
# Generate random chunk sizes from a Gaussian distribution
def generate_chunk_sizes(n, mean=64, scale=256, seed=0):
    np.random.seed(seed)
    return np.minimum(2048, np.maximum(1, np.random.normal(mean, scale, n))).astype(int)

In [None]:
# The simulation of Joining and Compacting

#             fixed cost      per tuple cost
# probe()     1.5             0.03
# next()      0.9             0.06
# --------------------------------------
# compact()   0.3             0.05
# --------------------------------------

k_prc_fixed_cost = (1.5 + 0.9)
k_prc_per_tuple_cost = (0.03 + 0.06)
k_cpt_fixed_cost = 0.3
k_cpt_per_tuple_cost = 0.05

# This function split each chunk into smaller chunks.
def split_array_np(numbers, parts):
    numbers = np.array(numbers)

    quotient, remainder = np.divmod(numbers, parts)
    split = np.repeat(quotient, parts)
    remainder = np.repeat(remainder, parts)
    split[np.arange(numbers.size * parts) % parts < remainder] += 1
        
    split = split[split > 0]

    return split

# Simulate the execution of joins.
def simulate_join(sizes, compact_func, chunk_factor=1, n_join=1, print_log=True):
    prc_cost = 0
    cpt_cost = 0
    next_sizes = np.array(sizes)

    if print_log:
        print_color(f"-------------------------", 'green')
        print_color(f"Compactor {compact_func}", 'green')

    for l in reversed(range(n_join)):
        # 1. Join
        prc_cost += k_prc_fixed_cost * len(next_sizes) + np.sum(next_sizes) * k_prc_per_tuple_cost

        # 2. Split chunks
        next_sizes = split_array_np(next_sizes, chunk_factor)
        input_chunk_number = len(next_sizes)

        # 3. Compact
        next_sizes, cost = compact_func(next_sizes, chunk_factor, l)
        output_chunk_number = len(next_sizes)

        if print_log:
            print_color(f"Level {n_join - l}: {input_chunk_number} -> {output_chunk_number} chunks, cost: {cost:.2f}", 'green')

        cpt_cost += cost

    return prc_cost, cpt_cost

## Compaction Strategies

In [None]:
# functions of computing join and compaction cost
def compute_prc_cost(chunk_size, chunk_factor, n_joins):
    per_tuple_cost = k_prc_per_tuple_cost * chunk_size * n_joins

    fixed_cost = 0
    for i in range(n_joins):
        fixed_cost += k_prc_fixed_cost * np.minimum(chunk_factor ** i, chunk_size)
    
    # print(f"per_tuple_cost: {per_tuple_cost:.2f}, fixed_cost: {fixed_cost:.2f}")
    return per_tuple_cost + fixed_cost


# Compute the cost of a single compaction
def compute_cpt_cost(sizes_in_one_compaction):
    return k_cpt_fixed_cost + np.sum(sizes_in_one_compaction) * k_cpt_per_tuple_cost

### Base Strategies

**Strategy 1**: Do not compact any chunks. 

**Strategy 2**: Fully compact all chunks.

In [None]:
def alg_no_compaction(chunk_sizes, chunk_factor, level):
    return chunk_sizes, 0

In [None]:
def alg_full_compaction(chunk_sizes, chunk_factor, level):
    transformed_sizes = []
    cpt_cost = 0
    cpt_sizes = []
    
    for size in chunk_sizes:
        if size == 2048: 
            transformed_sizes.append(size)
            continue

        if sum(cpt_sizes) + size <= 2048:
            cpt_sizes.append(size)
        else:
            l_size = 2048 - sum(cpt_sizes)
            cpt_sizes.append(l_size)

            cpt_cost += compute_cpt_cost(cpt_sizes)
            transformed_sizes.append(sum(cpt_sizes))
            cpt_sizes = [size - l_size]
    
    if cpt_sizes:
        cpt_cost += compute_cpt_cost(cpt_sizes)
        transformed_sizes.append(sum(cpt_sizes))

    return transformed_sizes, cpt_cost

### Optimal Strategy

**Strategy 3**: Sort all chunks ascendingly. For each compaction, it contains small chunks as more as possible. 

In [None]:
def alg_sort_compaction(chunk_sizes, chunk_factor, n_joins):
    assert len(chunk_sizes) > 0, "chunk_sizes must not be empty"

    sorted_sizes = sorted(chunk_sizes)
    transformed_sizes = []
    cpt_cost = 0

    i = 0
    cpt_sizes = [sorted_sizes[0]]
    for i in range(1, len(sorted_sizes), 1):
        size = sorted_sizes[i]
        cur_sum = sum(cpt_sizes)

        if cur_sum + size <= 2048:
            gain = compute_prc_cost(cur_sum, chunk_factor, n_joins) + compute_prc_cost(size, chunk_factor, n_joins) - compute_prc_cost(cur_sum + size, chunk_factor, n_joins)
            loss = (k_cpt_fixed_cost + size * k_cpt_per_tuple_cost)
            
            # print(f"gain: {gain:.2f}, loss: {loss:.2f}")
            # print(f"cur sum {cur_sum} + size {size} = {cur_sum + size}")

            if gain - loss > 0:
                cpt_sizes.append(size)
            else:
                break
        else:
            if len(cpt_sizes) > 1:
                cpt_cost += compute_cpt_cost(cpt_sizes)
            transformed_sizes.append(sum(cpt_sizes))

            cpt_sizes = [size]

    if cpt_sizes:
        if len(cpt_sizes) > 1:
            cpt_cost += compute_cpt_cost(cpt_sizes)
        transformed_sizes.append(sum(cpt_sizes))

    # Append the remaining chunks, it is not beneficial to compact them
    for j in range(i+1, len(sorted_sizes)):
        transformed_sizes.append(sorted_sizes[j])
        
    return transformed_sizes, cpt_cost

In [None]:
def compute_benefit(buffer, target, chunk_factor, n_joins):
    prc_gain = compute_prc_cost(buffer, chunk_factor, n_joins) + compute_prc_cost(target, chunk_factor, n_joins) - compute_prc_cost(buffer + target, chunk_factor, n_joins)
    cpt_cost = (k_cpt_fixed_cost + target * k_cpt_per_tuple_cost)
    return prc_gain - cpt_cost

In [None]:
# A compaction threshold is necessary

chunk_factor = 4
n_joins = 2
x_base = np.arange(1, 2048, 1)
y_add = np.arange(1, 2048, 1)

z_benefits = np.zeros((2047, 2047))
for i in range(2047):
    for j in range(2047):
        if x_base[i] + y_add[j] <= 2048 and x_base[i] + y_add[j] > 0:
            z_benefits[j][i] = compute_benefit(x_base[i], y_add[j], chunk_factor, n_joins)
        else:
            z_benefits[j][i] = None

# Assuming z_benefits is a numpy array
y_zero, x_zero = np.where(np.abs(z_benefits) < 0.01)

fig = go.Figure(data=go.Heatmap(x=x_base, y=y_add, z=z_benefits, colorscale='ice', showlegend=False))

fig.add_trace(go.Scatter(x=x_base[x_zero], y=y_add[y_zero], mode='lines', name='z = 0', line=dict(color='black',width=2)))

fig.update_layout(
    autosize=False,
    width=500,
    height=500,
    title="Compaction Benefit",
    xaxis_title="Buffer Chunks Size",
    yaxis_title="Target Chunk Size",
)

fig.show()

~~**Strategy 3.5**: Sort all chunks ascendingly. For each small chunk, distribute it into the largers.~~

In [None]:
def alg_sort_compaction_one_hop(chunk_sizes, chunk_factor, n_joins):
    assert len(chunk_sizes) > 0, "chunk_sizes must not be empty"

    sorted_sizes = sorted(chunk_sizes)
    transformed_sizes = []
    cpt_cost = 0

    def is_small_chunk(size, n_joins):
        return k_prc_fixed_cost * (n_joins > 0) > k_cpt_fixed_cost + size * k_cpt_per_tuple_cost

    i = 0
    for i in range(0, len(sorted_sizes)):
        size = sorted_sizes[i]
        assert size != 0, "size must not be zero"
        
        if not is_small_chunk(size, n_joins):
            break

        # check if this chunk can be distributed into other chunks
        n_slot = 0
        j = 0
        for j in range(len(sorted_sizes) - 1, i, -1):
            n_slot += 2048 - sorted_sizes[j]
            if n_slot >= size: break

        if n_slot < size:
            break;

        # compute the compaction cost
        cpt_cost += compute_cpt_cost([size])

        # distribute this chunk into other chunks
        assert j+1 <= len(sorted_sizes), "j+1 must be less than len(sorted_sizes)"
        k = 0
        for k in range(j+1, len(sorted_sizes)): 
            size -= 2048 - sorted_sizes[k]
            sorted_sizes[k] = 2048

        sorted_sizes[j] += size

    # Append the remaining chunks, it is not beneficial to compact them
    for j in range(i, len(sorted_sizes)):
        transformed_sizes.append(sorted_sizes[j])

    return transformed_sizes, cpt_cost

### DuckDB Strategy

**Strategy 4**: Set a threshold to distinguish the big chunk and the small chunk, we only compact small chunks.

In [None]:
k_block_size = 2048

In [None]:
def alg_binary_compaction(chunk_sizes, chunk_factor, level, threshold=128):
    trans_chunks = []
    cpt_cost = 0
    
    cpt_chunks = []
    for size in chunk_sizes:
        if size >= threshold:
            trans_chunks.append(size)
            continue

        cpt_chunks.append(size)

        if sum(cpt_chunks) >= k_block_size - threshold:
            trans_chunks.append(np.sum(cpt_chunks))
            cpt_cost += compute_cpt_cost(cpt_chunks)
            cpt_chunks = []
    
    return trans_chunks, cpt_cost

**Strategy 5**: The last strategy uses fixed threshold 128 for all joins. It is not precise, and we can compute a better threshold. 

In [None]:
def get_cpt_threhold(chunk_factor, n_joins):
    if n_joins == 0:
        return 0

    x = 1024
    y = np.arange(1, 1025, 1)
    z = np.zeros(1024)

    for i in range(1024):
        if x + y[i] <= 2048 and x + y[i] > 0:
            z[i] = compute_benefit(x, y[i], chunk_factor, n_joins)
        else:
            z[i] = None

    positive_indices = np.where(z > 0)[0]
    last_positive_index = positive_indices[-1] + 1 if len(positive_indices) > 0 else 0

    return last_positive_index

In [None]:
def alg_dynamic_compaction(chunk_sizes, chunk_factor, level):
    cpt_threshold = get_cpt_threhold(chunk_factor, level)

    print_color(f"Compaction Threshold: {cpt_threshold}")

    trans_chunks = []
    cpt_cost = 0
    
    cpt_chunks = []
    for size in chunk_sizes:
        if size >= cpt_threshold:
            trans_chunks.append(size)
            continue

        cpt_chunks.append(size)

        if sum(cpt_chunks) >= k_block_size - cpt_threshold:
            trans_chunks.append(np.sum(cpt_chunks))
            cpt_cost += compute_cpt_cost(cpt_chunks)
            cpt_chunks = []

    
    return trans_chunks, cpt_cost

## Experimental Results

In [None]:
def let_us_join(chunk_sizes, chunk_factor, num_join, print_log=True):
    print_color(f"1. Settings: \n", 'red')
    print_color(f"\t\t\t Join\t Compaction")
    print_color(f"Fixed Cost (us) \t {k_prc_fixed_cost}\t {k_cpt_fixed_cost}")
    print_color(f"Per-tuple Cost (us) \t {k_prc_per_tuple_cost}\t {k_cpt_per_tuple_cost}")
    print("\t")
    print_color(f"Chunk Factor: {chunk_factor}\nNumber of Joins: {num_join}")

    print_color(f"-------------------------" * 3)
    print_color(f"2. Estimated Thresholds: \n", 'red')


    grades = {
        "No Compaction": simulate_join(chunk_sizes, alg_no_compaction, chunk_factor, num_join, print_log), 
        "Full Compaction": simulate_join(chunk_sizes, alg_full_compaction, chunk_factor, num_join, print_log),
        "Sort Compaction": simulate_join(chunk_sizes, alg_sort_compaction, chunk_factor, num_join, print_log),
        # "Sort (One Hop) Compaction": simulate_join(chunk_sizes, alg_sort_compaction_one_hop, chunk_factor, num_join, print_log),
        "Binary Compaction": simulate_join(chunk_sizes, alg_binary_compaction, chunk_factor, num_join, print_log), 
        "Dynamic Compaction": simulate_join(chunk_sizes, alg_dynamic_compaction, chunk_factor, num_join, print_log),
    }

    print_color(f"-------------------------" * 3)
    print_color(f"3. Results: \n", 'red')

    for grade in grades:
        prc_cost = grades[grade][0]/1e6
        cpt_cost = grades[grade][1]/1e6
        print_color(f"[{grade}]", 'green')
        print(f"\t Total Cost: {prc_cost + cpt_cost:.2f}s\tCompute Cost: {prc_cost:.2f}s\t Compaction Cost: {cpt_cost:.2f}s")

In [None]:
# Initial Chunk Distribution
chunk_sizes = generate_chunk_sizes(n=int(2e7 / 2048), mean=2048, scale=0)

In [None]:
# Normal

chunk_factor = 16
num_join = 3
print_log = False

let_us_join(chunk_sizes, chunk_factor, num_join, print_log)

In [None]:
# High Compaction Cost

k_cpt_per_tuple_cost *= 10

chunk_factor = 16
num_join = 3
print_log = False
let_us_join(chunk_sizes, chunk_factor, num_join, print_log)

k_cpt_per_tuple_cost /= 10

In [None]:
# High Compaction Cost & Deep Pipeline

k_cpt_per_tuple_cost *= 10

chunk_factor = 12
num_join = 4
print_log = False
let_us_join(chunk_sizes, chunk_factor, num_join, print_log)

k_cpt_per_tuple_cost /= 10