## Import PPDB base catalog

This notebook imports the base PPDB catalog (2025/26 fall and winter data).

In [1]:
import lsdb
import pandas as pd
import numpy as np
import tempfile

from dask.distributed import Client
from datetime import date
from hats_import import pipeline_with_client
from hats_import.catalog import ImportArguments
from hats_import.collection.arguments import CollectionArguments
from ppdb_hats.daily.ingest import import_catalog
from ppdb_hats.daily.nest import load_sources_with_margin, nest_sources
from nested_pandas.utils import count_nested
from pathlib import Path
from ppdb_hats.daily.paths import get_paths, append_input_paths
from ppdb_hats.daily.postprocess import postprocess_catalog
from ppdb_hats.config import get_default_config

# Pipeline configuration
config = get_default_config()

Set up the input/output dirs:

In [2]:
# LSST PPDB repository
PPDB_DIR = Path("/sdf/scratch/rubin/ppdb/data/ppdb_lsstcam")

# Base HATS directory
output_dir = Path("/sdf/data/rubin/shared/lsdb_commissioning/ppdb")

Set up the Dask client:

In [3]:
# Temporary directory
tmp_dir = Path(tempfile.TemporaryDirectory().name)
print(f"Intermediate directory: {tmp_dir}")

# Dask client
client = Client(n_workers=16, memory_limit="8GB", threads_per_worker=1, local_directory=tmp_dir)

Intermediate directory: /lscratch/stavar/tmp/tmp24v0n6sx




### Get input files for each dataset type

We will grab all files until Jan 20, inclusive.

In [4]:
until_date = date(2026, 1, 20)
object_files = get_paths("dia_object", PPDB_DIR, until_date=until_date)
source_files = get_paths("dia_source", PPDB_DIR, until_date=until_date)
fsource_files = get_paths("dia_forced_source", PPDB_DIR, until_date=until_date)

### Import base catalogs

In [5]:
import_catalog(client, tmp_dir, "dia_object", object_files, config.import_config)
import_catalog(client, tmp_dir, "dia_source", source_files, config.import_config)
import_catalog(client, tmp_dir, "dia_forced_source", fsource_files, config.import_config)

Catalog: Planning  : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 1154.42it/s]
Catalog: Mapping   : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 249/249 [00:18<00:00, 13.67it/s]
Catalog: Binning   : 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████

### Post-processing

About 8% of objects have duplicates (same `diaObjectId`):

In [6]:
dia_object = lsdb.open_catalog(tmp_dir / "dia_object")
_, counts = np.unique(dia_object["diaObjectId"], return_counts=True)
n_dup_ids = np.sum(counts > 1)
n_dup_ids / len(dia_object) * 100

np.float64(7.705989774842453)

We will keep the oned of latest `validityStartMjdTai`, and add mag/magerr columns:

In [7]:
position_time_cols = config.postprocess_config.position_time_cols

postprocess_catalog(
    client,
    tmp_dir,
    "dia_object",
    position_time_cols=position_time_cols,
    validity_col="validityStartMjdTai",
    flux_colnames=[f"{band}_scienceFluxMean" for band in "ugrizy"],
)
postprocess_catalog(
    client,
    tmp_dir,
    "dia_source",
    position_time_cols=position_time_cols,
    flux_colnames=["scienceFlux"],
)
postprocess_catalog(
    client,
    tmp_dir,
    "dia_forced_source",
    position_time_cols=position_time_cols,
    flux_colnames=["scienceFlux"],
)

  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
dia_object: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:01<00:00,  2.53it/s]
  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
  return getattr(ufunc, method)(*new_inputs, **kwargs)
dia_source: 100%|███████████████████████████████████████████████████████████████████████████████████████

### Nest sources in objects

In [8]:
dia_object = lsdb.read_hats(tmp_dir / "dia_object")
margin_threshold = config.margin_threshold
dia_source = load_sources_with_margin(client, tmp_dir, "dia_source", margin_threshold)
dia_forced_source = load_sources_with_margin(client, tmp_dir, "dia_forced_source", margin_threshold)

There are ~6% of sources with no `diaObjectId`:

In [9]:
n_sources_no_objid = np.sum(dia_source["diaObjectId"].isna().compute())
n_sources_no_objid / len(dia_source) * 100

np.float64(5.6193954448733034)

We'll need to filter them out otherwise we cannot nest:

In [10]:
valid_dia_source = dia_source[~dia_source["diaObjectId"].isna()]

That does not seem to be an issue for `diaForcedSource`:

In [11]:
np.sum(dia_forced_source["diaObjectId"].isna().compute())

np.int64(0)

Nest sources and forced sources and write to disk:

In [12]:
dia_object_lc = nest_sources(dia_object, valid_dia_source, dia_forced_source)



Save the results to disk:

In [13]:
dia_object_lc.write_catalog(tmp_dir, catalog_name="dia_object_lc")

Let's reimport with leaf pixel directories:

In [14]:
dia_collection_dir = config.paths.dia_object_collection_dir

In [15]:
args = ImportArguments.reimport_from_hats(
    tmp_dir / "dia_object_lc",
    output_dir=dia_collection_dir,
    byte_pixel_threshold=1 << 30,
    npix_suffix="/",
    addl_hats_properties={"hats_npix_suffix": "/"},
    simple_progress_bar=True,
)
pipeline_with_client(args, client)

Validating catalog at path /lscratch/stavar/tmp/tmp24v0n6sx/dia_object_lc ... 
Found 6 partitions.
Approximate coverage is 15.62 % of the sky.


Catalog: Planning  : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 47.24it/s]
Catalog: Mapping   : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6/6 [01:51<00:00, 18.51s/it]
Catalog: Binning   : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:48<00:00, 24.29s/it]
Catalog: Splitting : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6/6 [00:04<00:00,  1.43it/s]
Catalog: Reducing  : 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████

### Finish collection

Generate margin and index catalogs.

In [16]:
args = (
    CollectionArguments(
        output_artifact_name="dia_object_collection",
        new_catalog_name="dia_object_lc",
        output_path=output_dir,
        simple_progress_bar=True,
    )
    .catalog(
        catalog_path=dia_collection_dir / "dia_object_lc",
    )
    .add_margin(margin_threshold=5.0, is_default=True)
    .add_index(indexing_column="diaObjectId")
)
pipeline_with_client(args, client)

Margin: Planning  : 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:02<00:00,  1.43it/s]
Margin: Mapping   : 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 11/11 [00:01<00:00,  9.81it/s]
Margin: Binning   : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 340.56it/s]
Margin: Reducing  : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 36/36 [00:00<00:00, 197.09it/s]
Margin: Finishing : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████

Let's also store which files we ingested in this run:

In [17]:
append_input_paths("dia_object", object_files, dia_collection_dir)
append_input_paths("dia_source", source_files, dia_collection_dir)
append_input_paths("dia_forced_source", fsource_files, dia_collection_dir)

### Some validation

In [18]:
df = lsdb.open_catalog(output_dir / "dia_object_collection").compute()

#### Checking objects

We have the same set of objects in the collection as in the original data:

In [19]:
input_objs = pd.read_parquet(object_files, dtype_backend="pyarrow")
input_obj_ids = np.unique(input_objs["diaObjectId"])
assert set(df["diaObjectId"]) == set(input_obj_ids)
assert len(input_obj_ids) == len(df)

#### Checking sources

All objects have sources:

In [20]:
len(df[~df["diaSource"].isna()]) / len(df) * 100

100.0

There are as many sources in the collection as in the base catalog (minus those with no diaObjectId):

In [21]:
input_sources = pd.read_parquet(source_files, dtype_backend="pyarrow")
expected_sources = input_sources[~input_sources["diaObjectId"].isna()]
assert len(expected_sources) == len(df["diaSource"].explode())

Though there are ~7% of objects for which "nDiaSource" doesn't match the number of "diaSource" we got:

In [22]:
count_df = count_nested(df, "diaSource", join=True)
unmatched = count_df[count_df["nDiaSources"] != count_df["n_diaSource"]]
len(unmatched) / len(count_df) * 100

7.255606249555245

#### Checking forced sources

Only ~5% of objects have forced sources:

In [23]:
len(df[~df["diaForcedSource"].isna()]) / len(df) * 100

5.169310311439514

This seems to make sense according to the input data:

In [None]:
forced_sources = pd.read_parquet(fsource_files, dtype_backend="pyarrow")
expected_f_sources = forced_sources[~forced_sources["diaObjectId"].isna()]
assert len(expected_f_sources) == len(df["diaForcedSource"].explode())

In [25]:
client.close()