# Scalable and versatile databasing for ML

Here we introduce Duckdb and Parquet, two tools that can be used to store and query data in a scalable and versatile way. We will show how to use them in Python and how to integrate them with other tools like Pandas.

## Iris dataset in Pandas: recap

In [None]:
# define path prefix for work directory as it might differ based on the environment
from pathlib import Path
WORK = Path(".")

In [None]:
import pandas as pd
from sklearn import datasets

In [None]:
def sklearnds2df(ds):
    df = pd.DataFrame(data=ds.data, columns=ds.feature_names)
    df['target'] = pd.Series(pd.Categorical.from_codes(ds.target,
                                                       categories=ds.target_names))
    return df

In [None]:
iris_ds = datasets.load_iris()
iris = sklearnds2df(iris_ds)
iris.columns = iris.columns.str.removesuffix(" (cm)").str.replace(" ","_")
iris

### Case study: Two datasets with features to be combined

A very common use-case in dataset preparation for ML is to combine features from two datasets into a single one.

We simulate two datasets, one with the length and width features of the sepal, and another with the features of the iris petal, two different parts of the Iris flower. We then want to combine them into a single dataset.

In [None]:
iris_sepals = iris[['sepal_length','sepal_width','target']]
iris_petals = iris[['petal_length','petal_width','target']]
iris_sepals

In [None]:
pd.merge(iris_sepals, iris_petals, how='inner', on='target')

`pandas.merge()` corresponds to a JOIN operation in SQL, and therefore needs a unique shared identifier on which to merge. As we can see, attempting to merge blocks of rows for the same category is naïve and won't yield the desired result.

In Pandas, dataframes have an index, which can serve this purpose:

In [None]:
pd.merge(iris_sepals, iris_petals, left_index=True, right_index=True, how='inner')

But not all dataframe implementations have an index. In facts, ones built for high performance and scalability, like Polars, don't have an index:

In [None]:
import polars as pl

def sklearnds2pl(ds):
    df = pl.DataFrame(data=ds.data, schema=[col.replace(" (cm)","") for col in ds.feature_names])
    targets = [ds.target_names[i] for i in ds.target]
    return df.with_columns(pl.Series(targets).cast(pl.Categorical).alias('target'))
sklearnds2pl(iris_ds)

And in a more realistic scenario, the features in the two datasets may not be named distinctly. Let's create this case and give each dataset a unique (specimen or plant) ID column.

In [None]:
def create_col_subset(df, prefix):
    dfs = df.filter(regex=f"^{prefix}")
    dfs = pd.concat([dfs, df.select_dtypes(include='category')], axis=1)
    dfs.index = [f"P{i:03d}" for i in iris.index.values]
    dfs.columns = dfs.columns.str.removeprefix(prefix)
    return dfs.reset_index(names='ID')
iris_sepals = create_col_subset(iris, 'sepal_')
iris_petals = create_col_subset(iris, 'petal_')
iris_sepals

In [None]:
pd.merge(iris_sepals, iris_petals, on='ID', how='inner', suffixes=('_sepal','_petal'))

The point isn't that this isn't possible with Pandas. The point is that we are using the wrong tool for the job. Pandas is great for data manipulation, but it's not a database. It's not designed to store and query data efficiently. It's designed to manipulate data efficiently. _(Adapted from a Copilot autocompletion, which says something about how common this problem is.)_

# The database way: Duckdb

In [None]:
import duckdb

In [None]:
db = duckdb.from_df(iris)

In [None]:
(type(db), db.shape, db.columns)

In [None]:
duckdb.sql("select * from db limit 5")

In [None]:
duckdb.sql("select min(sepal_width), max(sepal_width) from db")

In [None]:
duckdb.sql("select target, avg(sepal_length), avg(sepal_width) from db group by target")

### Duckdb can access dataframes directly

In [None]:
duckdb.sql("select target, avg(sepal_length), avg(sepal_width) from iris group by target")

In [None]:
duckdb.sql("select s.length as sepal_length, s.width as sepal_width "
           "from iris_sepals as s "
           "JOIN iris_petals USING (ID)")

In [None]:
duckdb.sql("select s.ID as sID, p.ID as pID, s.length as sepal_length, s.width as sepal_width, "
           "p.length as petal_length, p.width as petal_width, p.target as species "
           "from iris_sepals as s join iris_petals as p on (s.ID = p.ID) "
           "order by species")

### Duckdb can read online datasets

#### [HuggingFace](https://hf.co) (HF)

- One of if not the most widely used ML commons for models and datasets
- Datasets are easy to use and have a lot of datasets.

Duckdb can read datasets directly from HF. (In fact, Pandas can, too.) We can start with the [Iris dataset from scikit-learn](https://hf.co/datasets/scikit-learn/iris/) on HF.

In [None]:
duckdb.read_csv("https://huggingface.co/datasets/scikit-learn/iris/resolve/main/Iris.csv")

In [None]:
duckdb.sql("select * from read_csv('https://huggingface.co/datasets/scikit-learn/iris/resolve/main/Iris.csv') where species = 'Iris-setosa'")

#### fsspec and HF

In [None]:
import fsspec
# just like a local file system ('file'), but with 'hf' for huggingface
fsspec.filesystem('hf').ls('hf://datasets/scikit-learn/iris')

In [None]:
duckdb.read_csv("hf://datasets/scikit-learn/iris/Iris.csv")

### Parquet format

Parquet is a columnar storage format optimized for reading and writing very large datasets in chunks. It is fast and efficient, both in terms of storage and in terms of reading and writing, and it supports querying and filtering data efficiently without having to read (and thus download) the entire dataset.

Major features include:
- Hybrid between columnar and row-oriented storage: row groupw, within which data is stored in columns.
- Compression: Parquet is compressed by default.
- Metadata: Parquet stores metadata about the dataset, which can be used to optimize queries.
- Partitioning: Parquet can be partitioned, which can make queries faster when only a subset of the data needs to be read.

[![Michael Berk, Demystifying the Parquet File Format](https://miro.medium.com/v2/resize:fit:1024/format:webp/1*QEQJjtnDb3JQ2xqhzARZZw.png)](https://towardsdatascience.com/demystifying-the-parquet-file-format-13adb0206705)

The following resources are useful for learning more about Parquet:
- [Parquet documentation](https://parquet.apache.org/)
- [Demystifying the Parquet File Format](https://towardsdatascience.com/demystifying-the-parquet-file-format-13adb0206705)
- [Understanding the Parquet File Format: A Comprehensive Guide](https://medium.com/@siladityaghosh/understanding-the-parquet-file-format-a-comprehensive-guide-b06d2c4333db)

#### Parquet on HF

All datasets on HF have an auto-converted Parquet version. The dataset viewer on HF uses this. _(For larger datasets that aren't natively in Parquet format, the auto-converted Parquet version may be truncated.)_

We can read the Iris dataset directly from HF in Parquet format.

In [None]:
pdb = duckdb.read_parquet("hf://datasets/scikit-learn/iris@~parquet/default/train/0000.parquet")
pdb

In [None]:
duckdb.sql("select * from pdb where species = 'Iris-setosa'")

In [None]:
duckdb.sql("select * "
           "from read_parquet('hf://datasets/scikit-learn/iris@~parquet/default/train/0000.parquet') "
           "where species = 'Iris-setosa'")

### Parquet databases are very scalable

As an example, a very large dataset is [PD12M](https://huggingface.co/datasets/Spawning/PD12M). It is 12.4M rows in total, sharded into many Parquet files, each of which is about 19MB in size.

In [None]:
# opening one Parquet file
rel = duckdb.read_parquet('hf://datasets/Spawning/PD12M/metadata/pd12m.000.parquet')
rel.columns

In [None]:
duckdb.sql("select count(*) from rel")

In [None]:
# opening all Parquet files, i.e., the entire dataset
rel = duckdb.read_parquet('hf://datasets/Spawning/PD12M/metadata/*.parquet')

We can immediately inspeact the schema of the dataset:

In [None]:
rel.columns

In [None]:
duckdb.sql("select count(*) from rel")

In [None]:
duckdb.sql("select mime_type, count(*) from rel "
           "where width > 1024 and height > 768 "
           "group by mime_type")

In [None]:
duckdb.sql("select * from rel where width > 1024 and height > 768 and mime_type = 'image/gif'")

### Writing and reading Parquet

We can use duckdb to write the Iris dataset to Parquet:

In [None]:
duckdb.from_df(iris).to_parquet(f'{WORK}/iris.parquet')

But current versions of Pandas can do so as well:

In [None]:
iris.to_parquet(WORK / 'iris-pd.parquet', index=False)

#### Writing/reading a partitioned Parquet dataset

The "pedestrian" way to write a Parquet file is to use the `pyarrow` library. This is in fact the library that Pandas uses under the hood by default to read and write Parquet files.

The `partition_cols` argument is used to write the dataset in a partitioned format. This can allow for very efficient querying of the data if the filtering by a query aligns with the partitioning. (Note that Pandas accepts this parameter as well.)

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq

table = pa.Table.from_pandas(iris)
pq.write_to_dataset(table, root_path=WORK / 'iris', partition_cols=['target'],
                    existing_data_behavior='delete_matching')

The by-column value partitioning scheme ic common and is called hive-style partitioning.

Both Pandas and Duckdb can read partitioned Parquet datasets, but the arguments differ slightly.

In [None]:
pd.read_parquet(path=WORK / 'iris', partitioning='hive')

In [None]:
duckdb.read_parquet(f'{WORK}/iris/**/*.parquet', hive_partitioning=True)

Note that with hive partitioning, the partitioned columns are not included in the Parquet files. They are only present in the directory structure. (Though their metadata is present in the Parquet files.)

In [None]:
duckdb.read_parquet('iris/target=setosa/*.parquet')

#### Query Performance

Duckdb and Parquet achieve query performance for filtering etc that can rival or significantly exceed that of a traditional dataframe.

In [None]:
%%time
for i in range(50_000):
    res = duckdb.sql("select * from read_parquet('iris.parquet') where target = 'setosa'")

In [None]:
%%time
for i in range(50_000):
    res = iris.query("target == 'setosa'")

In [None]:
%%time
for i in range(50_000):
    res = iris.loc[iris.target == 'setosa',]

In [None]:
iris_large = iris.sample(n=100_000, replace=True)
iris_large.to_parquet(f'{WORK}/iris_large.parquet')
iris_large.to_csv(f'{WORK}/iris_large.csv', index=False)

In [None]:
%%time
for i in range(10_000):
    res = duckdb.sql("select * from read_parquet('iris_large.parquet') "
                     "where target = 'setosa'")

In [None]:
%%time
for i in range(10_000):
    res = iris_large.loc[iris_large.target == 'setosa',]

In [None]:
%%time
dbfile = f'{WORK}/iris_large.parquet'
for i in range(30_000):
    res = duckdb.sql(f"select target, count(*) from read_parquet('{dbfile}') group by target")


In [None]:
%%time
for i in range(30_000):
    res = iris_large.groupby('target', observed=True).count()

There are also different algorithms available for compression (e.g. Gzip, Brotli, Zstd) and encoding (e.g. Delta, RLE, PLAIN, DICT). These could further optimize query performance and storage efficiency for specific use-cases.

#### Appending to a Parquet dataset

In the OLAP notion, Parquet datasets are not designed for mutability. Hence, rows can't be updated or deleted. 

However, although rows can't simply be appended to a Parquet _file_, new rows _can_ be appended to a Parquet _dataset_, which can consist of multiple files:

- We can simply write new rows to a new Parquet file and make sure it is included in the file glob passed to the `read_parquet()` function. (Usually this means it should be in the same directory as the existing Parquet files.)
- We can also add new rows to an existing partitioned Parquet dataset. To allow existing files  but prevent overwriting them, we need to a combination of basename template and existing data behavior.

Let's say we have 3 batches of data but we receive them not all at once but in 3 separate batches:

In [None]:
iris_shuffled = iris.sample(frac=1, replace=False)
iris_b1 = iris_shuffled.iloc[:50]
iris_b2 = iris_shuffled.iloc[50:100]
iris_b3 = iris_shuffled.iloc[100:]

Write the first batch (assuming we don't have the others yet)

In [None]:
pq.write_to_dataset(pa.Table.from_pandas(iris_b1, preserve_index=False),
                    root_path=f'{WORK}/iris-b', partition_cols=['target'],
                    basename_template='b1-{i}.parquet',
                    existing_data_behavior='overwrite_or_ignore')

Then we can keep appending batches as we get them (or write all batches at once if we have them):

In [None]:
for b, batch in enumerate([iris_b2, iris_b3], start=2):
    pq.write_to_dataset(pa.Table.from_pandas(batch, preserve_index=False),
                        root_path=f'{WORK}/iris-b', partition_cols=['target'],
                        basename_template=f'b{b}'+'-{i}.parquet',
                        existing_data_behavior='overwrite_or_ignore')

How we query the dataset remains the same, regardless of whether the data was written all at once or in batches:

In [None]:
duckdb.sql(f"select * from read_parquet('{WORK}/iris-b/*/*.parquet', hive_partitioning=True)")

Changing the schema of a Parquet dataset (such as by adding columns) is difficult. The best way to approach this is to create separate Parquet files for each consistent data batch, then use `duckdb.sql()` to combine them (presumably using some kind of OUTER JOIN), and writing the result back to a Parquet dataset.

Say, our first batch doesn't have the petal measurements:

In [None]:
iris_b1.filter(regex="(sepal|target).*").to_parquet(f'{WORK}/iris-c1.parquet', index=False)

Then combine with the other two datasets

In [None]:
rel = duckdb.sql("select * from iris_b2 UNION ALL "
                 "select * from iris_b3 UNION ALL "
                 "select sepal_length, sepal_width, null, null, target " +
                 f"from read_parquet('{WORK}/iris-c1.parquet')")
rel

In [None]:
rel.to_parquet(f'{WORK}/iris-c.parquet')
# or alternatively as a partitioned dataset:
pq.write_to_dataset(rel.arrow(), root_path=f'{WORK}/iris-c', partition_cols=['target'],
                    existing_data_behavior='delete_matching')

In [None]:
duckdb.sql(f"select * from read_parquet('{WORK}/iris-c.parquet')")

In [None]:
duckdb.sql(f"select target, "
           "count(*) as num_rows, count(petal_length) as num_petal_measures "
           f"from read_parquet('{WORK}/iris-c.parquet')" +
           "group by target")

In [None]:
duckdb.sql(f"select * from read_parquet('{WORK}/iris-c/*/*.parquet', hive_partitioning=True)")