# Access the Iceberg catalog

This notebook shows you how to load iceberg tables using [pyiceberg](https://github.com/apache/iceberg-python).

Note: before running this notebook, be sure to materialize the dagster assets. Otherwise, the tables will not be available.

## Connecting to the catalog

You can connect to the postgresql catalog using the `pyiceberg.catalog.sql.SqlCatalog` object. The required credentials for MinIO (storage) and postgresql are available as environment variables.

In [1]:
import os

from pyiceberg.catalog.sql import SqlCatalog


catalog = SqlCatalog(
    name="dagster_example_catalog",
    **{
        "uri": os.environ["DAGSTER_SECRET_PYICEBERG_CATALOG_URI"],
        "s3.endpoint": os.environ["DAGSTER_SECRET_S3_ENDPOINT"],
        "s3.access-key-id": os.environ["DAGSTER_SECRET_S3_ACCESS_KEY_ID"],
        "s3.secret-access-key": os.environ[
            "DAGSTER_SECRET_S3_SECRET_ACCESS_KEY"
        ],
        "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
        "warehouse": os.environ["DAGSTER_SECRET_S3_WAREHOUSE"],
    }
)

If you don't see a namespace here, be sure to run `just nc` from the repository root.

In [2]:
catalog.list_namespaces()

[('air_quality',)]

If you've materialized the `daily_air_quality_data`, then you'll see it listed under the `air_quality` namespace.

In [3]:
catalog.list_tables(namespace="air_quality")

[('air_quality', 'daily_air_quality_data'),
 ('air_quality', 'stg_measurements'),
 ('air_quality', 'int_measurements_by_station_and_date'),
 ('air_quality', 'int_components_by_station'),
 ('air_quality', 'stg_test'),
 ('air_quality', 'int_test'),
 ('air_quality', 'stg_luchtmeetnet__measurements'),
 ('air_quality', 'int_measurement_stations_joined_with_components'),
 ('air_quality', 'int_measurements_aggregate_by_measurement_date'),
 ('air_quality', 'int_measurements')]

You can load the table as follows:

In [4]:
table_daily_air_quality_data = catalog.load_table("air_quality.daily_air_quality_data")

table_daily_air_quality_data

daily_air_quality_data(
  1: station_number: optional string,
  2: value: optional double,
  3: timestamp_measured: optional string,
  4: formula: optional string,
  5: measurement_date: optional date
),
partition by: [measurement_date],
sort order: [],
snapshot: Operation.APPEND: id=7801671428668118541, parent_id=6567737807636200600, schema_id=1

As you can see, the table is partitioned by the column `measurement_date`. This is because the `daily_air_quality_data` asset is partitioned on this column:

```python
# src/dagster_pyiceberg_example/partitions.py
daily_partition = DailyPartitionsDefinition(
    start_date=datetime.datetime(2024, 10, 20),
    end_offset=0,
    timezone="Europe/Amsterdam",
    fmt="%Y-%m-%d",
)

# src/dagster_pyiceberg_example/assets/__init__.py
@asset(
    description="Copy air quality data to iceberg table",
    compute_kind="iceberg",
    io_manager_key="warehouse_io_manager",
    partitions_def=daily_partition,
    ins={
        "ingested_data": AssetIn(
            "air_quality_data",
            # NB: need this to control which downstream asset partitions are materialized
            partition_mapping=MultiToSingleDimensionPartitionMapping(
                partition_dimension_name="daily"
            ),
            input_manager_key="landing_zone_io_manager",
            # NB: Some partitions can fail because of 500 errors from API
            #  So we need to allow missing partitions
            metadata={"allow_missing_partitions": True},
        )
    },
    code_version="v1",
    group_name="measurements",
    metadata={
        "partition_expr": "measurement_date",
    },
)
def daily_air_quality_data():
    ...
```

You can find the table metadata in the snapshot information. This also contains a reference to the dagster run id and partition key that generated the snapshot.

In [5]:
table_daily_air_quality_data.snapshots()[0].model_dump()

{'snapshot-id': 5902355183492091447,
 'sequence-number': 1,
 'timestamp-ms': 1731271332343,
 'manifest-list': 's3://warehouse/air_quality.db/daily_air_quality_data/metadata/snap-5902355183492091447-0-425c967c-e1d5-4c46-bac3-a89eb6b0e7fd.avro',
 'summary': {'operation': 'append',
  'added-files-size': '32793',
  'added-data-files': '1',
  'added-records': '11235',
  'changed-partition-count': '1',
  'created_by': 'dagster',
  'dagster_run_id': '9034ba5e-e09b-44de-ad9d-e9b2085ec7c0',
  'dagster_partition_key': '2024-11-09',
  'total-data-files': '1',
  'total-delete-files': '0',
  'total-records': '11235',
  'total-files-size': '32793',
  'total-position-deletes': '0',
  'total-equality-deletes': '0'},
 'schema-id': 0}

In [6]:
table_daily_air_quality_data.snapshots()

[Snapshot(snapshot_id=5902355183492091447, parent_snapshot_id=None, sequence_number=1, timestamp_ms=1731271332343, manifest_list='s3://warehouse/air_quality.db/daily_air_quality_data/metadata/snap-5902355183492091447-0-425c967c-e1d5-4c46-bac3-a89eb6b0e7fd.avro', summary=Summary(Operation.APPEND, **{'added-files-size': '32793', 'added-data-files': '1', 'added-records': '11235', 'changed-partition-count': '1', 'created_by': 'dagster', 'dagster_run_id': '9034ba5e-e09b-44de-ad9d-e9b2085ec7c0', 'dagster_partition_key': '2024-11-09', 'total-data-files': '1', 'total-delete-files': '0', 'total-records': '11235', 'total-files-size': '32793', 'total-position-deletes': '0', 'total-equality-deletes': '0'}), schema_id=0),
 Snapshot(snapshot_id=4207756199555223268, parent_snapshot_id=5902355183492091447, sequence_number=2, timestamp_ms=1731452819791, manifest_list='s3://warehouse/air_quality.db/daily_air_quality_data/metadata/snap-4207756199555223268-0-fd22a070-9630-4dcc-b3f0-148030fb9b16.avro', sum

In [8]:
import polars as pl

pdf = pl.scan_iceberg(
    table_daily_air_quality_data,
    snapshot_id=4207756199555223268
)

In [9]:
pdf.collect()

station_number,value,timestamp_measured,formula,measurement_date,__index_level_0__
str,f64,str,str,date,i64
"""NL10235""",0.4,"""2024-11-06T23:00:00+00:00""","""NO""",2024-11-06,0
"""NL10235""",15.74,"""2024-11-06T23:00:00+00:00""","""NO2""",2024-11-06,1
"""NL10235""",5.24,"""2024-11-06T23:00:00+00:00""","""O3""",2024-11-06,2
"""NL10235""",33.87,"""2024-11-06T23:00:00+00:00""","""PM10""",2024-11-06,3
"""NL10235""",0.77,"""2024-11-06T22:00:00+00:00""","""NO""",2024-11-06,4
…,…,…,…,…,…
"""NL49570""",36.5,"""2024-11-09T02:00:00+00:00""","""PM25""",2024-11-09,43
"""NL49570""",42.3,"""2024-11-09T01:00:00+00:00""","""PM10""",2024-11-09,44
"""NL49570""",35.2,"""2024-11-09T01:00:00+00:00""","""PM25""",2024-11-09,45
"""NL49570""",42.0,"""2024-11-09T00:00:00+00:00""","""PM10""",2024-11-09,46
