In [1]:
import os

os.environ['HF_HOME'] = '/home/husein/ssd4/scicom'

In [2]:
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import pyarrow.parquet as pq
from typing import List, Tuple
import math
import os

In [3]:
"""
Fully implemented multiprocess First-Fit-Decreasing (FFD) bin packer.

Design choices:
- Read Parquet with pyarrow (zero-copy where possible).
- Store numeric lengths in a shared-memory NumPy array so workers don't pickle 18M Python objects.
- Split work by index ranges (no dicts are sent to workers).
- Each worker performs a LOCAL FFD on its range and returns:
    - full_bins: list of bins (lists of indices) whose used == MAX_SIZE (these are kept as-is)
    - partial_items: flat list of item indices that belong to partially-filled bins
- Main process collects all full_bins and all partial_items, then runs a GLOBAL FFD *only* on partial_items.
- Final output: combined list of bins (each bin is a list of item indices). Map back to original data as needed.

This implementation is careful about memory, avoids heavy pickling, and keeps the heavy numeric array in shared memory.

Requirements: pyarrow, numpy

Run example:
    python multiprocess_ffd.py /path/to/file.parquet

"""

from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import pyarrow.parquet as pq
from typing import List, Tuple
import math
import os
from tqdm import tqdm

MAX_SIZE = 10240

def local_ffd_range(args):
    start, end, shm_name, n_items, dtype_str, lengths_min = args

    shm = SharedMemory(name=shm_name)
    arr = np.ndarray((n_items,), dtype=np.dtype(dtype_str), buffer=shm.buf)

    indices = np.arange(start, end, dtype=np.int64)
    order = np.argsort(-arr[indices])
    ordered_idx = indices[order]

    # FFD
    bins = []
    used = []

    for idx in tqdm(ordered_idx):
        L = int(arr[idx])
        placed = False
        for b, cap in enumerate(used):
            if cap + L <= MAX_SIZE:
                bins[b].append(int(idx))
                used[b] += L
                placed = True
                break
        if not placed:
            bins.append([int(idx)])
            used.append(L)

    full_bins = []
    partial_items = []

    for b, cap in zip(bins, used):
        if (cap + lengths_min) >= MAX_SIZE:
            full_bins.append(b)
        else:
            partial_items.extend(b)

    return full_bins, partial_items


def pack_ffd_indices(indices: List[int], lengths_array: np.ndarray, max_size: int = MAX_SIZE) -> List[List[int]]:
    """Global FFD taking item indices and using lengths_array for sizes.

    Returns bins as list of lists of indices.
    """
    if not indices:
        return []

    # Sort indices by descending length
    ordered = sorted(indices, key=lambda i: int(lengths_array[i]), reverse=True)

    bins: List[List[int]] = []
    used: List[int] = []

    for idx in tqdm(ordered):
        l = int(lengths_array[idx])
        placed = False
        for b, cap in enumerate(used):
            if cap + l <= max_size:
                bins[b].append(idx)
                used[b] += l
                placed = True
                break
        if not placed:
            bins.append([idx])
            used.append(l)

    return bins


def parallel_ffd(lengths: np.ndarray, cores: int = None, max_size: int = MAX_SIZE) -> List[List[int]]:
    """Top-level parallel FFD.

    lengths: NumPy 1D array of integers (np.int32 or compatible)
    cores: number of worker processes

    Returns final bins as lists of item indices.
    """
    n = lengths.shape[0]
    if cores is None:
        cores = max(1, os.cpu_count() - 1)

    lengths_min = lengths.min()
    dtype = lengths.dtype
    shm = SharedMemory(create=True, size=lengths.nbytes)
    try:
        arr = np.ndarray(lengths.shape, dtype=dtype, buffer=shm.buf)
        arr[:] = lengths[:]
        
        print('done sharedmemory')

        per = math.ceil(n / cores)
        ranges = []
        for i in range(0, n, per):
            ranges.append((i, min(n, i + per)))
            
        jobs = [
            (start, end, shm.name, n, str(dtype), lengths_min)
            for start, end in ranges
        ]

        with Pool(cores) as p:
            results = p.map(local_ffd_range, jobs)
            
        full_bins_all: List[List[int]] = []
        partial_items_all: List[int] = []

        for full_bins, partial_items in results:
            full_bins_all.extend(full_bins)
            partial_items_all.extend(partial_items)

        partial_bins = pack_ffd_indices(partial_items_all, arr, max_size=max_size)

        final_bins = full_bins_all + partial_bins
        return final_bins

    finally:
        # Cleanup shared memory
        shm.close()
        shm.unlink()

In [4]:
from huggingface_hub import hf_hub_download

f = hf_hub_download(
    repo_id="Scicom-intl/sort-multilingual-tts", 
    filename="sort-merge.parquet",
    repo_type="dataset",
    local_dir="./"
)

In [5]:
import pandas as pd

df = pd.read_parquet(f)
df.head()

Unnamed: 0,f,l,i
0,tokenized-4k-qwen3/tokenized-0,130,0
1,tokenized-4k-qwen3/tokenized-0,324,1
2,tokenized-4k-qwen3/tokenized-0,77,2
3,tokenized-4k-qwen3/tokenized-0,174,3
4,tokenized-4k-qwen3/tokenized-0,110,4


In [6]:
lengths = df['l'].to_numpy(dtype='int32')

In [13]:
bins = parallel_ffd(lengths, cores = 25)

In [8]:
len(bins)

614552

In [9]:
bins[0]

[669100, 673184, 669097, 669098, 673186, 669099, 673185, 9680]

In [15]:
df.iloc[bins[0]]

Unnamed: 0,f,l,i
669100,tokenized-4k-qwen3/tokenized-1,1719,205023
673184,tokenized-4k-qwen3/tokenized-1,1613,209107
669097,tokenized-4k-qwen3/tokenized-1,1511,205020
669098,tokenized-4k-qwen3/tokenized-1,1415,205021
673186,tokenized-4k-qwen3/tokenized-1,1293,209109
669099,tokenized-4k-qwen3/tokenized-1,1288,205022
673185,tokenized-4k-qwen3/tokenized-1,1240,209108
9680,tokenized-4k-qwen3/tokenized-0,161,9680


In [11]:
import json

with open('bin-packing.json', 'w') as fopen:
    json.dump(bins, fopen)

In [16]:
# !hf upload Scicom-intl/sort-multilingual-tts bin-packing.json --repo-type=dataset