# map_partitions

## Learning Objectives

In this tutorial, you will learn:

  * how to apply a user-defined function using distributed computing across an entire LSDB catalog's partitions
  * when, why, and how to supply required Dask "meta" when it cannot be inferred from your user-defined function

## Introduction

LSDB/HATS catalogs are organized into partitions, and this number of partitions is reported as `npartitions=` in the header, whenever printing a catalog.  Each partition corresponds to a single pixel, and they have been sized to have approximately the same number of rows in each partition, in order to enable efficient parallel computation.

The `map_partitions` method provides a means for users to execute their own analysis functions on each *partition* of the catalog data. The data will be passed to your function as a Pandas DataFrame (`pd.DataFrame`) object.

## 1. Open a catalog

For this example, we will use a small cone of the Gaia DR3 catalog, and only specify columns of interest. This limits the overall memory requirements of the pipeline.

In [1]:
# Dask puts out more advisory logging than we care for in this tutorial.
# It takes some doing to quiet all of it, but this recipe works.

import dask

dask.config.set({"logging.distributed": "critical"})

import logging

# This also has to be done, for the above to be effective
logger = logging.getLogger("distributed")
logger.setLevel(logging.CRITICAL)

import warnings

# Finally, suppress the specific warning about Dask dashboard port usage
warnings.filterwarnings("ignore", message="Port 8787 is already in use.")

In [2]:
import lsdb

# catalog_root = "https://data.lsdb.io/hats"
catalog_root = "/data3/epyc/data3/hats/catalogs"
gaia3 = lsdb.open_catalog(
    f"{catalog_root}/gaia_dr3",
    margin_cache="gaia_10arcs",
    search_filter=lsdb.ConeSearch(ra=280, dec=-60, radius_arcsec=2 * 3600),
    columns=[
        "source_id",
        "ra",
        "dec",
        "phot_g_mean_mag",
    ],
)
gaia3

Unnamed: 0_level_0,source_id,ra,dec,phot_g_mean_mag
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
"Order: 4, Pixel: 2944",int64[pyarrow],double[pyarrow],double[pyarrow],double[pyarrow]
"Order: 4, Pixel: 2945",...,...,...,...
"Order: 4, Pixel: 2946",...,...,...,...
"Order: 4, Pixel: 2947",...,...,...,...


In [3]:
# You can get the number of partitions programmatically this way.
# This can be valuable when you want to choose the optimal number
# of workers to process the partitions.
gaia3.npartitions

4

In [4]:
# You can also access the individual HealpixPixel objects for
# each partition, this way, inspecting their order and pixel,
# if desired.
px = gaia3.get_healpix_pixels()[0]
px.order, px.pixel

(np.int64(4), np.int64(2944))

## 2. Generating New Columns

Since the partition's `pd.DataFrame` is passed in to your custom function, you can augment it with new columns based on the existing columns, in ordinary Pandas style.

### 2.1 What you can map

The trick is understanding what kind of custom function you can pass to `.map_partitions`.  Your function is going to receive a Pandas DataFrame as its first parameter.  Other parameters can be passed in as keyword arguments to `.map_partitions`, as you'll see later on.  For now, we'll use a function that takes in one partition and produces a result that has the same shape.

Because the catalog is loaded lazily, `.map_partitions` also returns a lazy, or unevaluated, result.  You can see the results the same way you can realize the original catalog, by any of these means:
  * calling `.compute()` to produce a `pd.DataFrame` in memory;
  * calling `.to_hats()` to serialize it to disk as a HATS-format file;
  * calling `.head()` to see the first few rows, `.tail()` to see the last few.

In [5]:
def mean_sq(df, pixel):
    df["phot_g_mean_mag_sq"] = df["phot_g_mean_mag"] ** 2
    return df


unrealized = gaia3.map_partitions(mean_sq, include_pixel=True)
unrealized

Unnamed: 0_level_0,source_id,ra,dec,phot_g_mean_mag,phot_g_mean_mag_sq
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
"Order: 4, Pixel: 2944",int64[pyarrow],double[pyarrow],double[pyarrow],double[pyarrow],double[pyarrow]
"Order: 4, Pixel: 2945",...,...,...,...,...
"Order: 4, Pixel: 2946",...,...,...,...,...
"Order: 4, Pixel: 2947",...,...,...,...,...


Taking a quick peek to see whether our function works correctly, and if the results in our new column are about what we expect:

In [6]:
head_5 = unrealized.head(5)
head_5

Unnamed: 0_level_0,source_id,ra,dec,phot_g_mean_mag,phot_g_mean_mag_sq
_healpix_29,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
3315212135629220059,6630424242158614528,279.475941,-61.973682,20.157476,406.323839
3315212197603296958,6630424379597543936,279.40738,-61.977054,19.488537,379.803074
3315212213755151065,6630424413957281792,279.436314,-61.979345,18.925087,358.158918
3315212218438134563,6630424418254234752,279.412417,-61.979072,20.522877,421.18848
3315212218704706046,6630424418256341376,279.416172,-61.977445,16.336718,266.888355


Looks good! Now on to computing the complete result.

This unrealized result has a top-level property indicating how many partitions it has.  We can use this to choose our number of workers directly.

However, it's a good idea to bound the number of workers, in case the number of partitions is larger than we expect (or we move this code fragment elsewhere).

In [7]:
%%time
from dask.distributed import Client

npartitions = gaia3.npartitions

# Create a client which will be implicitly used until we make a new one
client = Client(n_workers=min(4, npartitions), memory_limit="auto")

result = unrealized.compute()
result

CPU times: user 2.25 s, sys: 804 ms, total: 3.05 s
Wall time: 10.2 s


Unnamed: 0_level_0,source_id,ra,dec,phot_g_mean_mag,phot_g_mean_mag_sq
_healpix_29,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
3315212135629220059,6630424242158614528,279.475941,-61.973682,20.157476,406.323839
3315212197603296958,6630424379597543936,279.40738,-61.977054,19.488537,379.803074
...,...,...,...,...,...
3318663093295462547,6637326190180387584,280.426265,-58.015067,21.097507,445.104802
3318663093808668415,6637326185883831296,280.423503,-58.013744,20.301455,412.149075


No reduction step is needed here since the operation is not a reducing operation.
There are as many rows in the new output as there were in the input.

## 3. Functions that reduce

The above works when your output rows are the same as your input rows.  When you're doing a reducing operation (such as calculating statistics), the process changes a little.

### 3.1. Your function's parameters

Again, your first input parameter is a `pd.DataFrame` that is one partition of the catalog, and the return value of your function needs to be the same, even if your result has only a single row.

If you want to know the HEALPix number of the partition, calling `.map_partitions` with `include_pixel=True` will pass that as the second parameter to your function.  We'll do this in this example, for demonstration purposes, though it isn't strictly necessary to this task.

If you have any other parameters that your function requires, take them as keyword arguments, and you can pass their values in as such, when calling `.map_partitions`.  Our example will do this, too, taking a `target_column=` argument.

### 3.2. What you get back

The operation we're going to do here is a reducing operation (min and max), and it will be run on each partition, reducing the many rows in each partition to a single value.  This means that the output of `.map_partitions` in this case will contain *one row per partition*.  Thus, you will need to do additional reduction on this output in order to get a single final result.

In [8]:
# Note that this function must work correctly when given an empty DataFrame
# as an input, too; if not, you're obliged to provide "meta", that is,
# information about output type and shape.
import pandas as pd


def find_stats(df, pixel, target_column=""):
    c = df[target_column]
    min_val = c.min()
    max_val = c.max()
    mean_val = c.mean()
    return pd.DataFrame(
        [
            {
                "pixel": pixel,
                f"{target_column}_min": min_val,
                f"{target_column}_max": max_val,
                f"{target_column}_mean": mean_val,
            }
        ]
    )

### 3.3 When You Need `meta=`

The above definition of `find_stats` works even with an empty `pd.DataFrame` argument because `.mean()` is written to handle zero-row inputs without errors.

But your own custom function might not. As a trivial example, suppose you implemented the arithmetic mean yourself, as below.

In [9]:
# This version of the function will NOT work with map_partitions as is, because
# of the attempt to divide by `c.count()`, which will be zero for an empty input.
import pandas as pd


def find_stats_needs_meta(df, pixel, target_column=""):
    c = df[target_column]
    min_val = c.min()
    max_val = c.max()
    # WARNING! c.count() == 0 when passed an empty DataFrame.
    # But meta= will come to the rescue.
    mean_val = c.sum() / c.count()
    return pd.DataFrame(
        [
            {
                "pixel": pixel,
                f"{target_column}_min": min_val,
                f"{target_column}_max": max_val,
                f"{target_column}_mean": mean_val,
            }
        ]
    )

In the above case, then, you need to indicate to Dask what type the output will be.

What Dask needs to know are the column names and their order, and so the below definition works, even though the types of the columns aren't indicated. (The type of `"pixel"` will default to `float64`, which is wrong, but doesn't matter in this case.)

In [10]:
output_meta = pd.DataFrame(
    {
        "pixel": [],
        "phot_g_mean_mag_min": [],
        "phot_g_mean_mag_max": [],
        "phot_g_mean_mag_mean": [],
    }
)

Here's another definition of `output_meta` that works equally well, and has the advantage that the way the `pd.DataFrame` is initialized is the same form as the return value in the custom function. The type of `"pixel"` will now be more correct:

In [11]:
output_meta = pd.DataFrame(
    [
        {
            "pixel": lsdb.HealpixPixel(0, 0),
            "phot_g_mean_mag_min": 0.0,
            "phot_g_mean_mag_max": 0.0,
            "phot_g_mean_mag_mean": 0.0,
        }
    ]
)

In [12]:
output_meta["pixel"].dtype

dtype('O')

although `pd.DataFrame` only cares that it is an "Object", as we can see.

These can be complicated and error-prone to construct, and small mistakes can create confusing errors that show up late in the computation.

To help with these difficulties, Dask does provide a `make_meta` function. If you can pass it a single valid row from your catalog data (that's what `head_5.head(1)` will do) which will work with your custom function, `make_meta` will generate the meta for it.

**NOTE:** It's much faster here to use the computed data from last time (`head_5`) than trying to use `gaia3.head(1)`. The latter will still give the right answer, but will be much slower, since it obliges Dask to re-execute the computation.

In [13]:
%%time
from dask.dataframe.utils import make_meta

output_meta = make_meta(
    find_stats_needs_meta(head_5.head(1), gaia3.get_healpix_pixels()[0], target_column="phot_g_mean_mag")
)

CPU times: user 3.64 ms, sys: 3.84 ms, total: 7.48 ms
Wall time: 5.82 ms


Passing a correct `meta=` to `.map_partitions` will allow Dask to skip the process of sending your function an empty `pd.DataFrame`, and so, in our case of `find_stats_needs_meta` (where we depend on a non-zero `c.count()`), it will succeed without error.

In [14]:
unrealized = gaia3.map_partitions(
    find_stats_needs_meta,
    include_pixel=True,
    # Keyword arguments after 'include_pixel=' are passed to your function.
    target_column="phot_g_mean_mag",
    # Here we give Dask the hint it needs to avoid giving us an empty frame
    meta=output_meta,
)

In [15]:
%%time
result = unrealized.compute()
result

CPU times: user 234 ms, sys: 85.5 ms, total: 319 ms
Wall time: 733 ms


Unnamed: 0,pixel,phot_g_mean_mag_min,phot_g_mean_mag_max,phot_g_mean_mag_mean
0,"Order: 4, Pixel: 2944",8.154537,22.033451,19.08384
0,"Order: 4, Pixel: 2945",5.508409,22.34575,19.094207
0,"Order: 4, Pixel: 2946",6.264917,22.07638,19.10286
0,"Order: 4, Pixel: 2947",6.154421,21.986425,19.119143


The objects in the 'pixel' column are the same type as from `get_healpix_pixels()`.

In [16]:
type(result["pixel"].iloc[0])

hats.pixel_math.healpix_pixel.HealpixPixel

Because the result is one row per partition, we need additional reduction to get our single answer.

In [17]:
result["phot_g_mean_mag_min"].min(), result["phot_g_mean_mag_max"].max()

(np.float64(5.508409), np.float64(22.34575))

What about searching not only the four partitions from our cone search, but the whole catalog?  All that changes is the number of partitions.

**NOTE** that since we are now using the `find_stats` that doesn't need `meta=`, we don't need to provide it.

In [18]:
gaia3_all = lsdb.open_catalog(
    f"{catalog_root}/gaia_dr3",
    margin_cache="gaia_10arcs",
    columns=[
        "source_id",
        "ra",
        "dec",
        "phot_g_mean_mag",
    ],
)
unrealized = gaia3_all.map_partitions(
    find_stats,
    include_pixel=True,
    # Keyword arguments after 'include_pixel=' are passed to your
    # function
    target_column="phot_g_mean_mag",
)
npartitions = unrealized.npartitions

In [19]:
npartitions

3933

That's a lot of partitions!  If we didn't bound this value, we could easily overwhelm our cluster.

In [20]:
%%time
# Close the old client and make a new one that takes into account the new number of partitions.
client.close()
client = Client(n_workers=min(8, npartitions), memory_limit="auto")
result = unrealized.compute()
result

CPU times: user 5min 39s, sys: 13.1 s, total: 5min 52s
Wall time: 6min 3s


Unnamed: 0,pixel,phot_g_mean_mag_min,phot_g_mean_mag_max,phot_g_mean_mag_mean
0,"Order: 2, Pixel: 0",3.382374,22.452248,18.722443
0,"Order: 3, Pixel: 4",3.836123,22.281105,18.794531
...,...,...,...,...
0,"Order: 4, Pixel: 3067",6.026669,22.432905,19.083511
0,"Order: 3, Pixel: 767",3.449377,22.423697,19.033006


In [21]:
# We need to do a final reduction step to get the true min and max
result["phot_g_mean_mag_min"].min(), result["phot_g_mean_mag_max"].max()

(np.float64(1.731607), np.float64(22.956425))

Since we just searched the whole catalog, we can check our answer
against the statistics that were compiled at import time for the
catalog.  As you can see, they match what we got when using
the `.map_partitions` method.

In [22]:
gaia3_all.hc_structure.aggregate_column_statistics(include_columns="phot_g_mean_mag")

Unnamed: 0_level_0,min_value,max_value,null_count,row_count
column_names,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
phot_g_mean_mag,1.731607,22.956425,5455339.0,1811710000.0


## Closing the Dask client

In [23]:
client.close()

## About

**Authors**: Derek Jones

**Last updated on**: April 17, 2025

If you use `lsdb` for published research, please cite following [instructions](https://docs.lsdb.io/en/stable/citation.html).