# Recreating and Improving MiniPile Dataset Creation

**Objectives:**
- [.] Implement and verify MiniPile’s filtering pipeline according to [Kaddour (2023)](https://arxiv.org/abs/2304.08442), but intended for decoder-only model use
- [] Evaluate and compare performances of Pythia $160\text{M}$ pretrained on The Pile vs. trained on the *newly, self-created MiniPile* on MMLU and ARC-Challenge
- [] Improve the dataset creation process, create new SuperMiniPile dataset (ideally smaller and more information-retaining)
- [] Train Pythia $160\text{M}$ on SuperMiniPile, evaluate on MMLU and ARC-Challenge
- [] Evaluate and compare performances of Pythia $1.4\text{B}$ pretrained on The Pile vs. trained on the created MiniPile on the MMLU and ARC benchmarks

In [None]:
#! pip install sentence-transformers

In [7]:
import os
import numpy as np
from tqdm import tqdm
from pathlib import Path
from datasets import load_dataset,  Dataset
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import normalize
from transformers import AutoTokenizer, AutoModel
from huggingface_hub import snapshot_download
from sentence_transformers import SentenceTransformer

In [2]:
base_dir = "/mnt/data"
base_path = Path(base_dir)

def download_model(down_dir: str, target_folder: str, cache_folder: str, repo_id: str, branch: str = "main") -> None:
    down_dir = Path(down_dir)
    target_dir = down_dir / target_folder
    cache_dir = down_dir / cache_folder

    os.makedirs(target_dir, exist_ok=True)
    os.makedirs(cache_dir, exist_ok=True)

    print(f"Downloading {repo_id}/{branch}...")

    while True:
        try:
            snapshot_download(
                repo_id,
                repo_type="model",
                revision=branch,
                cache_dir=str(cache_dir),
                local_dir=str(target_dir)
            )
            break
        except Exception as e:
            print(f"Download attempt failed: {e}")
            continue

---

## Recreating The MiniPile Dataset Creation Pipeline

(1) document embedding extraction,<br>
(2) clustering of embeddings, and<br>
(3) human-guided exclusion of unwanted clusters

- 22 data subset sources
- 5.91 KiB mean document size (before deduplication)

### Document Embedding Extraction

- MiniPile paper uses term "document": I assume as they are quite large, this refers to individual training examples from "The Pile-Deduplicated"
- "The Pile Deduplicated" predominantly contains english text, as stated in the Pile paper
- `E5-Large` does not require performing sentence-splitting beforehand, I was misguided by the example code at https://huggingface.co/intfloat/e5-large
- I will use `E5-Large` with one "sentence" actually being one "document" for MiniPile

In [3]:
# Starting point is the deduplicated The Pile
# Infer embeddings for all documents using E5-Large

# https://huggingface.co/intfloat/e5-large
download_model(down_dir=base_dir, target_folder="e5-large", 
               cache_folder="e5-large_Cache",
               repo_id="intfloat/e5-large") # Chose this because nothing beyond E5-Large was specified

e5_large = SentenceTransformer(str(base_path / "e5-large"), local_files_only=True) # no .from_pretrained() here

# https://huggingface.co/datasets/EleutherAI/the_pile_deduplicated
pile_dedup = load_dataset("parquet",
                          data_files={
                              "train": str(base_path / "Pile_Deduplicated" / "data" / "train-*.parquet"),
                          },
                          cache_dir=str(base_path / "MiniPile_Cache"),
                          split="train",
                          streaming=True)

Downloading intfloat/e5-large/main...


Fetching 12 files:   0%|          | 0/12 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/1650 [00:00<?, ?it/s]

Given the model and the local-stream for The Pile, we iterate through the dataset and extract the embeddings for each document.<br>
For convenience and later processing, we will assemble an embedding dataset.

Thing is, when creating the embedding dataset, we need to make sure that embedding indices match the document indices in the original dataset.<br>
This is strictly necessary for the filtering step later on.<br>
To ensure the above code's resulting embedding dataset is correctly aligned with the original dataset, I ran the following code for a small subset of `16384` documents:

```python
saved_embeddings = load_dataset("parquet", data_files=str(embd_dir / "shard_*.parquet"), split="train")

for index in tqdm(range(len(saved_embeddings)), desc="Verifying embeddings"):
    # Newly embed the text at this index
    original_text = next(iter(pile_dedup.skip(index).take(1)))['text']
    generated_embedding = e5_large.encode(original_text, show_progress_bar=False)
    # Newly encoded embedding should correspond to the saved embedding at this index -> index consistency
    saved_embedding = saved_embeddings[index]['embedding']
    if not np.allclose(generated_embedding, saved_embedding, atol=1e-5):
        print(f"Mismatch found at index: {index}")
```
No mismatches were found, which means we can scale the embedding set creation to the full dataset.

I know had a bit of a problem.<br>
Embedding "The Pile Deduplicated" with `E5-Large` will foreseeably require *a lot* of time and resources.<br>
But, the embedding will possibly be needed in raw format at multiple stages: For recreating the MiniPile dataset, and also as (potential) basis for the SuperMiniPile improvement.<br>
I am thinking that if an error occurs during either the embedding or clustering phase, it could complicate recovery efforts.
Therefore, I see more flexibility in creating the embedding dataset *separately* and then continuing with it for clustering and filtering.

This is the code I tested with the above code snippet and ultimately used to build the `Pile_Deduplicated_Embedded` dataset:

In [None]:
# Took the example code from the intfloat/e5-large page
embd_dir = base_path / Path("Pile_Deduplicated_Embeddings")
embd_dir.mkdir(exist_ok=True)

batch_size = 1024
shard_size = batch_size ** 2  # shard embed count upper bound

embedding_shard = []
shard_index = 0

def save_shard(embeddings, output_dir, shard_index):
    shard_path = output_dir / f"shard_{shard_index:09d}.parquet"
    dataset = Dataset.from_dict({"embedding": embeddings})
    dataset.to_parquet(str(shard_path))

# Didn't know tqdm could be used like that
for batch_idx, batch in tqdm(enumerate(pile_dedup.iter(batch_size=batch_size))):
    batch_embds = e5_large.encode(batch['text'], show_progress_bar=True) # Set this to False, good for debug but clutters like hell
    embedding_shard.extend(batch_embds)
    
    if len(embedding_shard) >= shard_size:
        save_shard(embedding_shard, embd_dir, shard_index)
        shard_index += 1
        embedding_shard = []

# Append remaining
if embedding_shard != []:
    save_shard(embedding_shard, embd_dir, shard_index)

I ran this code as the script `03_embed_pile_dedup.py`.

### Clustering of Embeddings

- Batchified $k$-means clustering, a term only used in the MiniPile paper: This must stand for **mini-batch k-means clustering**
- Cosine distance between normalized embeddings
- Cluster Count of $k=220$ ($10$ clusters per source)
- Batch size $16384$ (I made sure, once again, yes, MiniPile uses "The Pile Deduplicated"; this is slowly getting to me)

In [None]:
import os
import numpy as np

from tqdm import tqdm
from pathlib import Path
from typing import Optional, Tuple
from datasets import load_dataset, Dataset
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import normalize

class CosineMiniBatchKMeans(MiniBatchKMeans):
    def _partial_fit(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> MiniBatchKMeans:
        X_normalized = normalize(X, norm='l2', axis=1)
        return super()._partial_fit(X_normalized, y)
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        X_normalized = normalize(X, norm='l2', axis=1)
        return super().predict(X_normalized)

def cluster_embd_batch(batch_embeddings: np.ndarray,kmeans: CosineMiniBatchKMeans,training: bool = True) -> Tuple[np.ndarray, np.ndarray]:
    cluster_ids = kmeans._partial_fit(batch_embeddings) if training else kmeans.predict(batch_embeddings)
    distances = np.sum(normalize(batch_embeddings, norm='l2', axis=1) * normalize(kmeans.cluster_centers_, norm='l2', axis=1)[cluster_ids], axis=1) # Cosine distances to assigned centroids
    return cluster_ids, distances

base_dir = "/mnt/data"
embd_dir = Path(base_dir) / "Pile_Deduplicated_Embeddings"
output_dir = Path(base_dir) / "Pile_Deduplicated_Clusters"
output_dir.mkdir(exist_ok=True)

k_clusters = 220
batch_size = 16384

embeddings = load_dataset("parquet",
                          data_files=str(embd_dir / "shard_*.parquet"),
                          split="train",
                          streaming=True)

kmeans = CosineMiniBatchKMeans(n_clusters=k_clusters,
                               batch_size=batch_size,
                               random_state=42,
                               init='k-means++',
                               max_iter=100,
                               verbose=1)

# First pass: Train the clustering model
print("Training clustering model...")
for batch in tqdm(embeddings.iter(batch_size=batch_size)):
    batch_embeddings = np.array(batch['embedding'])
    _ = cluster_embd_batch(batch_embeddings, kmeans, training=True)

# Second pass: Assign clusters and compute distances
print("\nAssigning clusters and computing distances...")

current_shard = []
shard_size = batch_size * 16 # Adjust based on available memory
shard_idx = 0

for batch in tqdm(embeddings.iter(batch_size=batch_size)):
    batch_embeddings = np.array(batch['embedding'])
    cluster_ids, distances = cluster_embd_batch(batch_embeddings,
                                                kmeans,
                                                training=False)
    
    current_shard.extend([{"embedding": emb,
                           "cluster_id": int(cid),
                           "centroid_distance": float(dist)} for emb, cid, dist in zip(batch_embeddings, cluster_ids, distances)])
    
    if len(current_shard) >= shard_size:
        shard_path = output_dir / f"clustered_shard_{shard_idx:09d}.parquet"
        Dataset.from_list(current_shard).to_parquet(str(shard_path))
        current_shard = []
        shard_idx += 1

# Save remaining
if current_shard:
    shard_path = output_dir / f"clustered_shard_{shard_idx:09d}.parquet"
    Dataset.from_list(current_shard).to_parquet(str(shard_path))

# Save clustering model itself
np.save(output_dir / "cluster_centroids.npy",
        kmeans.cluster_centers_)

---

## Improve the Dataset Creation Process

Ideas:

- DBSCAN-like clustering for better focus on outliers, generalizability of the approach beyond "The Pile Deduplicated"
- weighted $k$-means to account for different subset sizes and complexities
- "Findings indicate that it is not the proportion of tokens occupied by high-utility data that aids acquisition, but rather the proportion of training steps assigned to such data" [On the effect of curriculum learning with developmental data for grammar acquisition (Opper, et al. 2023)](https://aclanthology.org/2023.conll-babylm.pdf)

- https://openreview.net/pdf?id=7D5EECbOaf9
- https://arxiv.org/pdf/2406.03057
- https://arxiv.org/pdf/2210.15809
- https://arxiv.org/pdf/2204.08499
- https://arxiv.org/pdf/2303.09540
- https://arxiv.org/pdf/2308.12284