# Parquet Content-Defined Chunking

Apache Parquet is a columnar storage format that is widely used in the data engineering community. 

As Hugging Face hosts nearly 11PB of datasets with Parquet files alone accounting for over 2.2PB of that storage, optimizing Parquet storage is of high priority.
Hugging Face has introduced a new storage layer called [Xet](https://huggingface.co/blog/xet-on-the-hub) that leverages content-defined chunking to efficiently deduplicate chunks of data reducing storage costs and improving download/upload speeds.

While Xet is format agnostic, Parquet's layout and column-chunk (data page) based compression can produce entirely different byte-level representations for data with minor changes, leading to suboptimal deduplication performance. To address this, the Parquet files should be written in a way that minimizes the byte-level differences between similar data, which is where content-defined chunking (CDC) comes into play.

In [144]:
import hashlib
import wikipedia
from fastcdc import fastcdc
from dataclasses import dataclass
from IPython.display import display, HTML


@dataclass
class Chunk:
    """Represents a single chunk of data."""
    hash: str
    data: bytes
    first_seen_in: int = 0


class Store(dict):
    """
    Simple in-memory content-addressable store for CDC chunks.
    """

    @classmethod
    def from_bytes(cls, data: bytes):
        """
        Apply FastCDC to the input data and create a map of hashed chunks.
        """
        chunks = fastcdc(
            data,
            fat=True, 
            hf=hashlib.sha256, 
            min_size=64, 
            avg_size=256, 
            max_size=1024
        )
        store = cls()
        for chunk in chunks:
            store[chunk.hash] = Chunk(hash=chunk.hash, data=chunk.data)
        return store
    
    @classmethod
    def merge(cls, stores):
        """
        Merge multiple chunk stores into a single store while updating 
        each chunk's first_seen_in attribute for later visualization.
        """
        merged = cls()
        for index, store in enumerate(stores):
            for chunk in store.chunks:
                if chunk.hash not in merged:
                    merged[chunk.hash] = Chunk(
                        hash=chunk.hash,
                        data=chunk.data,
                        first_seen_in=index
                    )
        return merged
    
    @property
    def chunks(self):
        return list(self.values())
    
    @property
    def num_chunks(self):
        return len(self)
    
    @property
    def overall_size(self):
        return sum(len(chunk.data) for chunk in self.chunks)

    def visualize(self):
        """
        Visualize the store as a colored strip in ipython.
        Each chunk's width is proportional to its size.
        """
        colors = ['#00FF00', '#FF0000', '#0000FF', '#FFFF00', '#FF00FF', '#00FFFF']
        html = '<div style="display: flex; flex-direction: row; width: 100%;">'
        total_size = self.overall_size
        for chunk in self.chunks:
            color = colors[chunk.first_seen_in % len(colors)]
            width_percent = 100 * len(chunk.data) / total_size if total_size else 0
            html += f'<div style="background-color: {color}; height: 20px; width: {width_percent}%;"></div>'
        html += '</div>'
        html += '<p>Overall size: {} bytes</p>'.format(self.overall_size)
        html += '<p>Number of chunks: {}</p>'.format(self.num_chunks)
        display(HTML(html))

    
def cdc(data):
    """Store data in a content-addressable store using content-defined chunking."""
    if isinstance(data, bytes):
        return Store.from_bytes(data)
    elif isinstance(data, str):
        data = data.encode('utf-8')
        return Store.from_bytes(data)
    elif isinstance(data, list):
        return Store.merge([cdc(item) for item in data])
    else:
        raise TypeError("Unsupported data type for chunking: {}".format(type(data)))

In [148]:
article = wikipedia.page("Apache Arrow").content
edited_article = "Apache Arrow is a very cool project!\n" + article

In [150]:
cdc(edited_article).summary()

AttributeError: 'Store' object has no attribute 'summary'

In [151]:
cdc([article, edited_article, article]).visualize()

In [138]:
cdc(edited_article).chunks[:3]

[Chunk(hash='34a17202faaef45473b7846d5964909e3008e7ee91667a15da7344d7b3b78e5b', data=b'Apache Arrow is a very cool project!\nApache Arrow is a language-agnostic software framework for developing data analytics applications that ', first_seen_in=0),
 Chunk(hash='57f2fb4d30543d0c53b07398e8a2733a0656fad4de122ac8940d59f5c7edf40d', data=b'process columnar data. It contains a standardized column-oriented memory format that is able to represent flat and hierarchical data for efficient analyti', first_seen_in=0),
 Chunk(hash='f2e85785416f5787b77b8b7c70c1096c3511e752118fd81f39bcc0f701d80524', data=b'c operations on modern CPU and GPU hardware. This reduces or eliminates factors that limit the feasibility of working with large sets of data, such as the cost, vol', first_seen_in=0)]

In [86]:
cdc([article, edited_article])

{'03e91020d386b54235013ed9112eb4906ff1f13e1c4cbb0fc5ddcfc161cf7c40': 'Apache Arrow is a language-agnostic software framework for developing data analytics applications that ',
 '57f2fb4d30543d0c53b07398e8a2733a0656fad4de122ac8940d59f5c7edf40d': 'process columnar data. It contains a standardized column-oriented memory format that is able to represent flat and hierarchical data for efficient analyti',
 'f2e85785416f5787b77b8b7c70c1096c3511e752118fd81f39bcc0f701d80524': 'c operations on modern CPU and GPU hardware. This reduces or eliminates factors that limit the feasibility of working with large sets of data, such as the cost, vol',
 '8e70c62081aeb4ead6d4d501418182ce4fa84ec6101eeadf9ab4e1519c102f86': 'atility, or physical constraints of dynamic random-access memory.\n\n\n== Interoperability ==\nArrow can be used with Apache Parquet, Apache Spark, NumPy, PySpark, pandas and other data processing libraries.\nThe project includes native software libraries written in',
 'e593034ec59e9b2beae

In [59]:
chunk([edited_article])

{'7f8cf0933e7e94188a5524021cbc02c1a3f7da933fb7ef33a079cd452bb874d3': 'Apache Parquet\nApache Parquet is a free and open-source column-oriented data storage format in the Apache Hadoop ecosystem. It is similar to RCFile and ORC, the other columnar-storage file formats in Hadoop, and is compatible with most of the data processing frameworks around Hadoo',
 '5e1be5561bd23e3ef658c5a5941a1d70749c74e2bb7b78310cf81a86fa35cd81': 'p. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.\n\n\n== History ==\nThe open-source project to build Apache Parquet began as a joint effort between Twitter and Cloudera. Parquet was designed as an improvement ',
 '68fe3ba35cdcbe968c3660174e4678846f3576e312f83b2b3b78cc6374d16b8f': 'on the Trevni columnar storage format created by Doug Cutting, the creator of Hadoop. The first version, Apache Parquet 1.0, was released in July 2013. Since April 27, 2015, Apache Parquet has been a top-level Apache So

In [56]:
print(chunk([article, article]))

Store(num_chunks=19, overall_size=4760 bytes)


In [40]:
chunk([article, edited_article]).overall_size

5709

In [49]:
chunk([article, article]).num_chunks

19

In [27]:
article_chunks.keys() - edited_article_chunks.keys()

{'4e27765a44b02d50897d0dad6710bcae3e0aa1ca32fb55c3478aa73d56e14756',
 '52c4d8a7cc5549d7f1dd22102e26a410993fab646a40e16bcf3d3256cb018e56'}

In [28]:
edited_article_chunks.keys() - article_chunks.keys()

{'132c9e09e260e41593f46791f98f323870b470b5e0aec53ce97250fc8d91ab94',
 '4332334b32e7cccad653422c3b8d5fa99f282c0dcd417cba3223c1f67fe263e2',
 '7f8cf0933e7e94188a5524021cbc02c1a3f7da933fb7ef33a079cd452bb874d3'}

In [None]:
## Content-Defined Chunking (CDC)

TODO: briefly explain what CDC is and how it works through an example, show a negative example on a parquet file


To address this, we introduced a new feature in the Apache Parquet C++ implementation that allows users to write Parquet files in a way that minimizes the number changed data pages 

Content-defined chunking (CDC) is a technique that divides data into variable-sized chunks based on the content of the data itself, rather than fixed-size blocks. 

In [1]:
import pathlib
import glob

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq

from de import estimate
from datasets import load_dataset

In [2]:
ds = load_dataset("wikimedia/wikipedia", "20231101.en", split="train")

Resolving data files:   0%|          | 0/41 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/41 [00:00<?, ?it/s]

In [3]:
ds.to_parquet("wiki.parquet", use_content_defined_chunking=False)

Creating parquet from Arrow format:   0%|          | 0/7 [00:00<?, ?ba/s]

20200062385

In [4]:
table = pq.read_table("wiki.parquet")

In [None]:
table = table[:100_000]

In [27]:
print(len(table))
table = table[:100_000]

6407814


In [28]:
def write_dataset(table, base_dir, num_shards, **kwargs):
    # ensure that directory exists
    base_dir = pathlib.Path(base_dir)
    base_dir.mkdir(parents=True, exist_ok=True)

    rows_per_file = len(table) / num_shards
    for i in range(num_shards):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, len(table))
        shard = table.slice(start, end - start)
        path = base_dir / f"part-{i}.parquet"
        pq.write_table(shard, path, **kwargs)

In [None]:
def estimate_deduplication_ratio(*patterns):
    # find all parquet files matching the patterns
    paths = sum([glob.glob(pattern) for pattern in patterns], [])
    total_size, deduped_size, compressed_deduped_size = estimate(paths)
    print(f"Total size: {total_size / (1024 * 1024):.2f} MB")
    print(f"Deduped size: {deduped_size / (1024 * 1024):.2f} MB")
    print(f"Compressed deduped size: {compressed_deduped_size / (1024 * 1024):.2f}")
    return deduped_size / total_size
    

In [30]:
write_dataset(table, "shard1", num_shards=1, use_content_defined_chunking=False)
write_dataset(table, "shard4", num_shards=4, use_content_defined_chunking=False)

In [32]:
write_dataset(table, "shard1", num_shards=1, use_content_defined_chunking=True)
write_dataset(table, "shard4", num_shards=4, use_content_defined_chunking=True)

In [23]:
write_dataset(table, "shard20", num_shards=20, use_content_defined_chunking=True)

In [33]:
estimate_deduplication_ratio("shard1/*.parquet", "shard4/*.parquet")

Total size: 536.72 MB
Deduped size: 275.84 MB
Compressed deduped size: 276.64


0.515424550609722

In [None]:
estimate_deduplication_ratio("shard8/*.parquet", "shard20/*.parquet")

['shard8/part-2.parquet', 'shard8/part-3.parquet', 'shard8/part-1.parquet', 'shard8/part-0.parquet', 'shard8/part-5.parquet', 'shard8/part-4.parquet', 'shard8/part-6.parquet', 'shard8/part-7.parquet', 'shard20/part-18.parquet', 'shard20/part-2.parquet', 'shard20/part-11.parquet', 'shard20/part-10.parquet', 'shard20/part-3.parquet', 'shard20/part-19.parquet', 'shard20/part-8.parquet', 'shard20/part-12.parquet', 'shard20/part-1.parquet', 'shard20/part-0.parquet', 'shard20/part-13.parquet', 'shard20/part-9.parquet', 'shard20/part-16.parquet', 'shard20/part-5.parquet', 'shard20/part-4.parquet', 'shard20/part-17.parquet', 'shard20/part-6.parquet', 'shard20/part-15.parquet', 'shard20/part-14.parquet', 'shard20/part-7.parquet']
Total size: 22203.23 MB
Deduped size: 11201.79 MB
Compressed deduped size: 11233.67


0.5059472759226442

In [8]:
mini = table[:100000]

In [11]:
pq.write_table(mini, "mini.parquet")

In [14]:
pq.write_table(mini[:50_000], "nocdc1.parquet", use_content_defined_chunking=False)
pq.write_table(mini[50_000:], "nocdc2.parquet", use_content_defined_chunking=False)
pq.write_table(mini[:50_000], "cdc1.parquet", use_content_defined_chunking=True)
pq.write_table(mini[50_000:], "cdc2.parquet", use_content_defined_chunking=True)
pq.write_table(mini, "cdc.parquet", use_content_defined_chunking=True)
pq.write_table(mini, "nocdc.parquet", use_content_defined_chunking=False)

In [26]:
mini_ = pa.concat_tables([mini[30_000:], mini[:30_000]])

In [27]:
pq.write_table(mini_, "cdc_.parquet", use_content_defined_chunking=True)
pq.write_table(mini_, "nocdc_.parquet", use_content_defined_chunking=False)

In [29]:
pq.write_table(mini_, "cdc__.parquet", use_content_defined_chunking=True, row_group_size=20_000)
pq.write_table(mini_, "nocdc__.parquet", use_content_defined_chunking=False, row_group_size=20_000)

In [2]:
ds = load_dataset("HuggingFaceTB/finemath", "finemath-4plus", split="train[:1_000_000]")

Resolving data files:   0%|          | 0/128 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/64 [00:00<?, ?it/s]

In [11]:
ds.to_parquet("finemath.parquet", use_content_defined_chunking=False)

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

5822002066

In [12]:
table = pq.read_table("finemath.parquet")

In [14]:
!ls -lh finemath.parquet

-rw-r--r--  1 kszucs  staff   2.6G Jul  7 11:03 finemath.parquet


In [13]:
len(table)

1000000

### Add a new column