# Merge Policies of Timsort and Powersort

This notebook gives fully functional “educational” implementations of the Timsort and Powersort merge policy (as I presented them at PyCon US 2023).


**The talk slides are [available here](https://speakerdeck.com/sebawild/quicksort-timsort-powersort).**

The goal of these implementations is *not* to be fast, but to illustrate the algorithms and serve as reference.

## Helper methods

`merge` is a simple merge method from the textbook.
The actual implementations in CPython (and other Timsort ports) use a galloping merge instead.

`merge_inplace` offers the interface of an in-place method by copying the merged result back into the list.
It also prints which merge was executed so one can reconstruct what happened.  
TODO: A proper interface instead of print might be nice ...

`extend_run` scans from the given position onwards to find a maximal run. Again, real implementations would enforce a minimal length for runs here, but we do not include that here.

In [1]:
import math


MERGE_COST = 0

def merge(run1, run2):
    """Merge two runs in a list"""
    result = []
    while len(run1) > 0 and len(run2) > 0:
        if run1[0] < run2[0]:
            result.append(run1.pop(0))
        else:
            result.append(run2.pop(0))
    result.extend(run1)
    result.extend(run2)
    return result


def merge_inplace(a, i, m, j):
    print(f'Merge({i}, {m}, {j})')
    global MERGE_COST
    MERGE_COST += j-i
    a[i:j] = merge(a[i:m], a[m:j])


def extend_run(a, i):
    """Extend a run in a list"""
    if i == len(a) - 1:
        return i + 1
    j = i + 1
    if a[i] <= a[j]:
        while j < len(a) and a[j - 1] <= a[j]:
            j += 1
    else:
        while j < len(a) and a[j - 1] > a[j]:
            j += 1
        a[i:j] = reversed(a[i:j])
    return j


def extend_run_increasing_only(a, i):
    """Extend a run in a list"""
    if i == len(a) - 1:
        return i + 1
    j = i + 1
    while j < len(a) and a[j - 1] <= a[j]:
        j += 1
    return j


## Timsort

The Timsort merge policy is used in `timsort` to sort the given list

In [2]:
def rule_A(W, X, Y, Z):
    if X is None:
        return False
    return Z > X


def rule_B(W, X, Y, Z):
    if Y is None:
        return False
    return Z >= Y


def rule_C(W, X, Y, Z):
    if X is None:
        return False
    return Y + Z >= X


def rule_D(W, X, Y, Z):
    if W is None:
        return False
    return X + Y >= W


def merge_X_Y(a, runs):
    assert len(runs) >= 3
    X = runs[-3]
    Y = runs[-2]
    Z = runs[-1]
    assert Y[0] == X[0] + X[1]
    merge_inplace(a, X[0], Y[0], Y[0] + Y[1])
    runs[-3] = (X[0], X[1] + Y[1])
    del runs[-2]


def merge_Y_Z(a, runs):
    assert len(runs) >= 2
    Y = runs[-2]
    Z = runs[-1]
    assert Z[0] == Y[0] + Y[1]
    merge_inplace(a, Y[0], Z[0], Z[0] + Z[1])
    runs[-2] = (Y[0], Y[1] + Z[1])
    del runs[-1]


def timsort(a, extend_run=extend_run):
    """Sort a list using timsort"""
    # This uses the rules as they are on the COMP526 slides, gives correct merges!
    i = 0; runs = []
    while i < len(a):
        j = extend_run(a, i)
        runs.append((i, j - i))
        i = j
        while len(runs) >= 2:
            Z = runs[-1]
            Y = runs[-2] if len(runs) >= 2 else [None, None]
            X = runs[-3] if len(runs) >= 3 else [None, None]
            W = runs[-4] if len(runs) >= 4 else [None, None]
            rule_a = rule_A(W[1], X[1], Y[1], Z[1])
            rule_b = rule_B(W[1], X[1], Y[1], Z[1])
            rule_c = rule_C(W[1], X[1], Y[1], Z[1])
            rule_d = rule_D(W[1], X[1], Y[1], Z[1])
            if rule_a:
                merge_X_Y(a, runs)
            elif rule_b:
                merge_Y_Z(a, runs)
            elif rule_c:
                merge_Y_Z(a, runs)
            elif rule_d:
                merge_Y_Z(a, runs)
            else:
                break
    while len(runs) >= 2:
        merge_Y_Z(a, runs)



In [3]:
a = list(range(10))+list(range(10))+list(range(2))

In [4]:
timsort(a)

Merge(0, 10, 20)
Merge(0, 20, 22)


## Powersort

The Powersort merge policy is used in `powersort`
to sort the given list.

In [5]:
def power(run1, run2, n):
    i1 = run1[0]; n1 = run1[1]
    i2 = run2[0]; n2 = run2[1]
    assert i1 >= 0
    assert i2 == i1 + n1
    assert n1 >= 1 and n2 >= 1
    assert i2 + n2 <= n
    a = (i1 + n1/2) / n
    b = (i2 + n2/2) / n
    l = 0
    while math.floor(a * 2**l) == math.floor(b * 2**l):
        l += 1
    return l


def power_fast(run1, run2, n):
    # Compute the "power" of the run boundary.
    # Given are two adjacent runs from a list of total length n.
    # See listsort.txt for details; the code follows the CPython implementation.
    i1 = run1[0]; n1 = run1[1]
    i2 = run2[0]; n2 = run2[1]
    # a' = i1 + l1/2
    # b' = i2 + l2/2 = a' + (l1 + l2)/2
    a = 2 * i1 + n1       # 2 * a'
    b = a + n1 + n2       # 2 * b'
    l = 0
    while True:
        l += 1
        if a >= n:
            assert b >= a
            a -= n
            b -= n
        elif b >= n:
            break
        assert a < b < n
        a <<= 1
        b <<= 1
    return l


def merge_topmost_2(a, runs):
    assert len(runs) >= 2
    Y = runs[-2]
    Z = runs[-1]
    assert Z[0] == Y[0] + Y[1]
    merge_inplace(a, Y[0], Z[0], Z[0] + Z[1])
    runs[-2] = (Y[0], Y[1] + Z[1], Y[2]) #min(Y[2], Z[2]))
    del runs[-1]


def powersort(a, extend_run=extend_run):
    """
    Sort a list using powersort.
    This is a slick variant for managing the stack
    (no need to update the power inside the stack), but it is conceptually
    not exactly the same as the CPython print('Merge cost:', MERGE_COST)implementation.
    This stores the run-boundary power in the right run, CPython stores it
    in the left run.
    """
    n = len(a)
    i = 0
    runs = []
    j = extend_run(a, i)
    runs.append((i, j - i, 0))
    i = j
    while i < n:
        j = extend_run(a, i)
        p = power(runs[-1], (i,j-i), n)
        while p <= runs[-1][2]:
            merge_topmost_2(a, runs)
        runs.append((i,j-i,p))
        i = j
    while len(runs) >= 2:
        merge_topmost_2(a, runs)



## Some examples

We give a few examples and show how the two methods differ.

In [6]:
def fill_with_asc_runs_high_to_low(A, runLengths, runLenFactor=1):
    """Fills the given array A with ascending runs of the given list of run
    lengths.
    More precisely, the array is first filled n, n-1, ..., 1
    and then for i=0..l-1 segments of runLengths.get(i) * runLenFactor
    are sorted ascending.
    The sum of all lengths in runLengths times runLenFactor should be equal to the
    length of A.
    """
    n = len(A)
    assert sum(runLengths) * runLenFactor == n
    for i in range(n):
        A[i] = n - i
    i = 0
    for l in runLengths:
        L = l * runLenFactor
        A[i:i + L] = sorted(A[i:i + L])
        i += L



The example from the slides:

In [8]:
runs = [5,3,3,14,1,2] # example from slides
#runs = [9,16,7,7,7,7,7,7,7,7,7,7]

a = list(range(sum(runs)))
fill_with_asc_runs_high_to_low(a, runs)
MERGE_COST = 0
print('Sorting with Timsort:')
timsort(a, extend_run=extend_run_increasing_only)
print('Merge cost:', MERGE_COST)
print()
fill_with_asc_runs_high_to_low(a, runs)
MERGE_COST = 0
print('Sorting with Powersort:')
powersort(a, extend_run=extend_run_increasing_only)
print('Merge cost:', MERGE_COST)

Sorting with Timsort:
Merge(5, 8, 11)
Merge(0, 5, 11)
Merge(0, 11, 25)
Merge(25, 26, 28)
Merge(0, 25, 28)
Merge cost: 73

Sorting with Powersort:
Merge(0, 5, 8)
Merge(0, 8, 11)
Merge(25, 26, 28)
Merge(11, 25, 28)
Merge(0, 11, 28)
Merge cost: 67
