In [2]:
import pyarrow.parquet as pq
import pandas as pd
import pyarrow as pa

## Create a Parquet file

How can we create a Parquet file in Python?

Let's start from a Python DataFrame

In [2]:
df = pd.DataFrame(
    {
        'one': [-1, 0, 2.5],
        'two': ['foo', 'bar', 'baz'],
        'three': [True, False, True]
    },
    index=list('abc')
)

We then use the Apache Arrow _specification_.


> Apache Arrow was born from the need for a **set of standards** around tabular data representation and interchange between systems. The adoption of these standards reduces computing costs of data serialization/deserialization and implementation costs across systems implemented in different programming languages.

In Python, we can use PyArrow, the Python implementation of the Arrow specifications.

In [4]:

# pyarrow.Table object
"""The PyArrow Table type is not part of the Apache Arrow specification, but is rather a tool to help with wrangling multiple record batches and array pieces as a single logical dataset. As a relevant example, we may receive multiple small record batches in a socket stream, then need to concatenate them into contiguous memory for use in NumPy or pandas. The Table object makes this efficient without requiring additional memory copying."""
table = pa.Table.from_pandas(df)

# https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow.parquet.write_table
pq.write_table(table, 'example.parquet')

Parquet metadata...

In [9]:
pq.read_metadata('example.parquet')

<pyarrow._parquet.FileMetaData object at 0x12f8c4220>
  created_by: parquet-cpp-arrow version 18.0.0
  num_columns: 4
  num_rows: 3
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2572

What is metadata?
https://parquet.apache.org/docs/file-format/metadata/
We have just read the `FileMetadata`

> The file metadata is described by the `FileMetaData` structure. This file metadata provides offset and size information useful when navigating the Parquet file. 

![Parquet Metadata](docs/FileFormat.gif)

Let's take a larger file

https://www.stats.govt.nz/large-datasets/csv-files-for-download/
https://www.stats.govt.nz/assets/Uploads/New-Zealand-business-demography-statistics/New-Zealand-business-demography-statistics-At-February-2024/Download-data/geographic-units-by-industry-and-statistical-area-2000-2024-descending-order.zip

In [3]:
df_large = pd.read_csv('./geographic-units-by-industry-and-statistical-area-2000-2024-descending-order/geographic-units-by-industry-and-statistical-area-2000-2024-descending-order-february-2024.csv')
print(df_large.head())
print(df_large.shape)

  anzsic06     Area  year  geo_count  ec_count
0        A  A100100  2024         87       200
1        A  A100200  2024        135       210
2        A  A100301  2024          6        35
3        A  A100400  2024         54        35
4        A  A100500  2024         51        95
(6751326, 5)


We write again this table to a Parquet file

In [None]:
pq.write_table(pa.Table.from_pandas(df_large), 'example_large.parquet')

Let's dig more into the Metadata file

In [10]:
parquet_file = pq.ParquetFile('example_large.parquet')
parquet_file.metadata

<pyarrow._parquet.FileMetaData object at 0x13500f6f0>
  created_by: parquet-cpp-arrow version 18.0.0
  num_columns: 5
  num_rows: 6751326
  num_row_groups: 6752
  format_version: 2.6
  serialized_size: 3260180

## Is Parquet 100% columnar?

What are row groups?
Ref:
https://blog.det.life/i-spent-8-hours-learning-parquet-heres-what-i-discovered-97add13fb28f

Traditional row-wise formats store data as records, one after another, much like a database table. This format is intuitive and works well when accessing entire records frequently. However, it can be inefficient when dealing with analytics, where you often only need specific columns from a large dataset.

![row storage](./docs/row-storage.webp)

Columnar formats address this issue by storing data in columns instead of rows. This means that when you need specific columns, you can read only the data you need, significantly reducing the amount of data scanned.

![columnar storage](./docs/columnar-storage.webp)

However, simply storing data in a columnar format has some downsides. The record **write or update** operation requires touching **multiple column segments**, resulting in numerous **I/O** operations. This can significantly slow the write performance, especially when dealing with large datasets.

In addition, when queries involve multiple columns, the database system must reconstruct the records from separate columns. The cost of this reconstruction increases with the **number of columns** involved in the query.

![row groups](./docs/row-groups.webp)

The format groups data into “row groups,” each containing a subset of rows. (horizontal partition.) Within each row group, data for each column is called a “column chunk.” (vertical partition)



## Page header

We will not cover a further level of granular metadata: `PageHeader`

The page header metadata is stored with the page data and includes information such as value encoding, definition encoding, and repetition encoding. In addition to the data values, Parquet also stores definition and repetition levels to handle nested data. The application uses the page header to **read and decode** the data.

In [17]:
## let's look att Row Group metadata

parquet_file.metadata.row_group(0)

<pyarrow._parquet.RowGroupMetaData object at 0x135078630>
  num_columns: 5
  num_rows: 1000
  total_byte_size: 15617
  sorting_columns: ()

In [16]:
parquet_file.metadata.row_group(0).column(2)

<pyarrow._parquet.ColumnChunkMetaData object at 0x1135e53f0>
  file_offset: 0
  file_path: 
  physical_type: INT64
  num_values: 1000
  path_in_schema: year
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x13500f790>
      has_min_max: True
      min: 2024
      max: 2024
      null_count: 0
      distinct_count: None
      num_values: 1000
      physical_type: INT64
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('PLAIN', 'RLE', 'RLE_DICTIONARY')
  has_dictionary_page: True
  dictionary_page_offset: 5607
  data_page_offset: 5631
  total_compressed_size: 99
  total_uncompressed_size: 95

Why is this statistic useful?
Let's try a basic count

In [8]:
df_large = pd.read_csv('./geographic-units-by-industry-and-statistical-area-2000-2024-descending-order/geographic-units-by-industry-and-statistical-area-2000-2024-descending-order-february-2024.csv')
print(df_large.count())

anzsic06     6751326
Area         6751326
year         6751326
geo_count    6751326
ec_count     6751326
dtype: int64


In [7]:
parquet_file = pq.ParquetFile('example_large.parquet')
row_count = parquet_file.metadata.num_rows
print(f"Total number of rows: {row_count}")
#given this pyarrow parquet file, compute the count of rows in the year column.

Total number of rows: 6751326


Something more sophisticated, count the rows per year

In [15]:
df_large = pd.read_csv('./geographic-units-by-industry-and-statistical-area-2000-2024-descending-order/geographic-units-by-industry-and-statistical-area-2000-2024-descending-order-february-2024.csv')
count_per_year = df_large.groupby('year').size()
print(count_per_year)

year
2000    247192
2001    245753
2002    246695
2003    250130
2004    259121
2005    263223
2006    265095
2007    266518
2008    267211
2009    268547
2010    267549
2011    267954
2012    268210
2013    267911
2014    270514
2015    273210
2016    275177
2017    277515
2018    278822
2019    281313
2020    282621
2021    283621
2022    290242
2023    293312
2024    293870
dtype: int64


If we avoid using Pandas, but just PyArrow engine?

In [31]:
tbl = pq.read_table('example_large.parquet')
count_per_year = tbl.group_by('year').aggregate([('year', 'count')])
print(count_per_year)

pyarrow.Table
year: int64
year_count: int64
----
year: [[2024,2020,2019,2015,2014,...,2022,2007,2012,2021,2002]]
year_count: [[293870,282621,281313,273210,270514,...,290242,266518,268210,283621,246695]]


Have we been lucky?

In [12]:
import timeit

execution_times = []

for _ in range(10):
    start_time = timeit.default_timer()
    
    # Your code
    df_large = pd.read_csv('./geographic-units-by-industry-and-statistical-area-2000-2024-descending-order/geographic-units-by-industry-and-statistical-area-2000-2024-descending-order-february-2024.csv')
    count_per_year = df_large.groupby('year').size()
    
    end_time = timeit.default_timer()
    execution_time = end_time - start_time
    execution_times.append(execution_time)

average_execution_time = sum(execution_times) / len(execution_times)
print(f"Average execution time: {average_execution_time} seconds")

Average execution time: 0.8749954289989545 seconds


In [32]:
import timeit

execution_times = []

for _ in range(10):
    start_time = timeit.default_timer()
    
    # Your code
    tbl = pq.read_table('example_large.parquet')
    count_per_year = tbl.group_by('year').aggregate([('year', 'count')])

    end_time = timeit.default_timer()
    execution_time = end_time - start_time
    execution_times.append(execution_time)

average_execution_time = sum(execution_times) / len(execution_times)
print(f"Average execution time: {average_execution_time} seconds")


Average execution time: 0.04289828349428717 seconds


Looks promising. We understood a bit more about
- the metadata of a Parquet file
- how data is stored in row groups and column chunks
- why is this relevant for performance

We learned the power of metadata, but is this metadata making the overall file larger?

Let's have a look.

In [27]:
import os

file_size = os.path.getsize('example_large.parquet') / (1024 * 1024)
print(f"File size: {file_size:.2f} MB")


file_size = os.path.getsize('./geographic-units-by-industry-and-statistical-area-2000-2024-descending-order/geographic-units-by-industry-and-statistical-area-2000-2024-descending-order-february-2024.csv') / (1024 * 1024)
print(f"File size: {file_size:.2f} MB")

File size: 18.72 MB
File size: 139.39 MB


The file is actually an order-of-magnitude smaller. That's good news

## Partitioning

Next, let's use more features of writing Parquet files. The improvements we obtained so far were low-hanging fruits. We now loot at how partitioning works.

We need to use the [write_dataset](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html#pyarrow.dataset.write_dataset) function of PyArrow.

In [33]:
df_large = pd.read_csv('./geographic-units-by-industry-and-statistical-area-2000-2024-descending-order/geographic-units-by-industry-and-statistical-area-2000-2024-descending-order-february-2024.csv')
arrow_table = pa.Table.from_pandas(df_large)

import pyarrow.dataset as ds

ds.write_dataset(
    arrow_table,
    "./example_large_partitioned",
    format='parquet',
    partitioning=ds.partitioning(pa.schema([("year", pa.int32())]))
)

In [35]:
%%bash
ls example_large_partitioned

[34m2000[m[m
[34m2001[m[m
[34m2002[m[m
[34m2003[m[m
[34m2004[m[m
[34m2005[m[m
[34m2006[m[m
[34m2007[m[m
[34m2008[m[m
[34m2009[m[m
[34m2010[m[m
[34m2011[m[m
[34m2012[m[m
[34m2013[m[m
[34m2014[m[m
[34m2015[m[m
[34m2016[m[m
[34m2017[m[m
[34m2018[m[m
[34m2019[m[m
[34m2020[m[m
[34m2021[m[m
[34m2022[m[m
[34m2023[m[m
[34m2024[m[m


In [37]:
%%bash
ls example_large_partitioned/2000/

part-0.parquet


In this case the [pyarrow.dataset.dataset()](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset) function provides an interface to discover and read all those files as a single big dataset.

In [43]:
import pyarrow.dataset as ds

dataset = ds.dataset("./example_large_partitioned", format="parquet", partitioning=ds.partitioning(pa.schema([("year", pa.int32())])))
dataset.files

['./example_large_partitioned/2000/part-0.parquet',
 './example_large_partitioned/2001/part-0.parquet',
 './example_large_partitioned/2002/part-0.parquet',
 './example_large_partitioned/2003/part-0.parquet',
 './example_large_partitioned/2004/part-0.parquet',
 './example_large_partitioned/2005/part-0.parquet',
 './example_large_partitioned/2006/part-0.parquet',
 './example_large_partitioned/2007/part-0.parquet',
 './example_large_partitioned/2008/part-0.parquet',
 './example_large_partitioned/2009/part-0.parquet',
 './example_large_partitioned/2010/part-0.parquet',
 './example_large_partitioned/2011/part-0.parquet',
 './example_large_partitioned/2012/part-0.parquet',
 './example_large_partitioned/2013/part-0.parquet',
 './example_large_partitioned/2014/part-0.parquet',
 './example_large_partitioned/2015/part-0.parquet',
 './example_large_partitioned/2016/part-0.parquet',
 './example_large_partitioned/2017/part-0.parquet',
 './example_large_partitioned/2018/part-0.parquet',
 './example_

Note that data has not been read yet. The whole dataset can be viewed as a single big table using [pyarrow.dataset.Dataset.to_table()](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.to_table).

Notice that converting to a table will force all data to be loaded in memory. For big datasets is usually not what you want.

In [44]:
table = dataset.to_table()
print(table)


pyarrow.Table
anzsic06: string
Area: string
geo_count: int64
ec_count: int64
year: int32
----
anzsic06: [["C25","C25","C25","C25","C25",...,"F341","F341","F341","F341","F341"],["M70","M70","M70","M70","M70",...,"Q851","Q851","Q851","Q851","Q851"],...,["O77","O77","O77","O77","O77",...,"R","R","R","R","R"],["F373","F373","F373","F373","F373",...,"H440","H440","H440","H440","H440"]]
Area: [["A215000","A215101","A215200","A215401","A215600",...,"A119000","A119500","A119700","A120000","A120500"],["A166400","A166500","A166700","A167000","A167101",...,"A146000","A146400","A146501","A146600","A146700"],...,["A237100","A237200","A237500","A237600","A237700",...,"A161400","A161500","A161700","A161800","A161900"],["R07","R08","R09","R12","R13",...,"A340700","A341101","A341201","A341300","A341400"]]
geo_count: [[0,0,0,3,3,...,0,0,0,0,3],[3,0,0,3,0,...,0,6,3,0,12],...,[0,0,3,0,3,...,15,6,3,6,0],[48,84,282,15,474,...,3,18,3,3,6]]
ec_count: [[0,0,0,3,0,...,0,3,0,0,0],[15,0,0,0,3,...,0,6,0,0,18],...,

There are different ways to exploit partitioning, the easiest is filtering when reading.

Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics

In [46]:
filtered_dataset = dataset.filter(ds.field('year') == 2020)
table = filtered_dataset.to_table()
print(table)


pyarrow.Table
anzsic06: string
Area: string
geo_count: int64
ec_count: int64
year: int32
----
anzsic06: [["Total","Total","Total","Total","Total",...,"Total","Total","Total","Total","Total"],["B08","B08","B08","B08","B08",...,"C259","C259","C259","C259","C259"],...,["N73","N73","N73","N73","N73",...,"Q860","Q860","Q860","Q860","Q860"],["Q860","Q860","Q860","Q860","Q860",...,"Total","Total","Total","Total","Total"]]
Area: [["A149600","A149701","A149702","A149800","A149901",...,"T073","T074","T075","T076","TTotal"],["A335301","A335701","A336800","A340700","A343600",...,"A113300","A113402","A113800","A114000","A114500"],...,["A330600","A330700","A330800","A330900","A331000",...,"A305600","A305800","A305900","A306100","A306501"],["A306801","A307301","A307501","A307601","A307801",...,"A149100","A149200","A149300","A149400","A149500"]]
geo_count: [[153,270,114,477,450,...,6960,2049,5316,207252,596973],[3,0,0,0,3,...,0,3,6,0,6],...,[3,3,3,3,6,...,0,3,3,0,0],[3,0,6,3,0,...,168,300,210,111,81]]