# Access the Iceberg catalog

This notebook shows you how to load iceberg tables using [pyiceberg](https://github.com/apache/iceberg-python).

Note: before running this notebook, be sure to materialize the dagster assets. Otherwise, the tables will not be available.

## Connecting to the catalog

You can connect to the Polaris catalog as follows. The required credentials have been stored in the `/home/vscode/workspace/.pyiceberg.yaml` configuration file, which PyIceberg automatically finds because the `PYICEBERG_HOME` environment variable has been set in `.devcontainer/.env`.

In [None]:
from pyiceberg.catalog import load_catalog

catalog = load_catalog(
    name="dagster_example_catalog",
)

If you don't see a namespace here, be sure to run `just nc` from the repository root.

In [2]:
catalog.list_namespaces()

[('air_quality',)]

If you've materialized the `daily_air_quality_data`, then you'll see it listed under the `air_quality` namespace.

In [3]:
catalog.list_tables(namespace="air_quality")

[('air_quality', 'daily_air_quality_data'),
 ('air_quality', 'daily_avg_air_quality_data')]

You can load the table as follows:

In [4]:
table_daily_air_quality_data = catalog.load_table("air_quality.daily_air_quality_data")

table_daily_air_quality_data

daily_air_quality_data(
  1: station_number: optional string,
  2: value: optional double,
  3: timestamp_measured: optional string,
  4: formula: optional string,
  5: measurement_date: optional date
),
partition by: [measurement_date],
sort order: [],
snapshot: Operation.APPEND: id=5168509437393138058, parent_id=6774933075444546875, schema_id=0

As you can see, the table is partitioned by the column `measurement_date`. This is because the `daily_air_quality_data` asset is partitioned on this column:

```python
# src/dagster_pyiceberg_example/partitions.py
daily_partition = DailyPartitionsDefinition(
    start_date=datetime.datetime(2024, 10, 20),
    end_offset=0,
    timezone="Europe/Amsterdam",
    fmt="%Y-%m-%d",
)

# src/dagster_pyiceberg_example/assets/__init__.py
@asset(
    description="Copy air quality data to iceberg table",
    compute_kind="iceberg",
    io_manager_key="warehouse_io_manager",
    partitions_def=daily_partition,
    ins={
        "ingested_data": AssetIn(
            "air_quality_data",
            # NB: need this to control which downstream asset partitions are materialized
            partition_mapping=MultiToSingleDimensionPartitionMapping(
                partition_dimension_name="daily"
            ),
            input_manager_key="landing_zone_io_manager",
            # NB: Some partitions can fail because of 500 errors from API
            #  So we need to allow missing partitions
            metadata={"allow_missing_partitions": True},
        )
    },
    code_version="v1",
    group_name="measurements",
    metadata={
        "partition_expr": "measurement_date",
    },
)
def daily_air_quality_data():
    ...
```

You can find the table metadata in the snapshot information. This also contains a reference to the dagster run id and partition key that generated the snapshot.

In [6]:
table_daily_air_quality_data.snapshots()[0].model_dump()

{'snapshot-id': 4597381090457490275,
 'sequence-number': 1,
 'timestamp-ms': 1732195883036,
 'manifest-list': 'file:///tmp/dagster_example_catalog/air_quality/daily_air_quality_data/metadata/snap-4597381090457490275-0-83501f5b-c0bf-4af2-ab38-775e61e88b6a.avro',
 'summary': {'operation': 'append',
  'added-files-size': '3903',
  'added-data-files': '1',
  'added-records': '612',
  'changed-partition-count': '1',
  'created-by': 'dagster',
  'dagster-run-id': 'd77731e4-0551-4d25-b58e-1f72cbcaaeba',
  'pyiceberg-version': '0.8.0',
  'dagster-pyiceberg-version': '0.1.3',
  'dagster-partition-key': '2024-11-20',
  'total-data-files': '1',
  'total-delete-files': '0',
  'total-records': '612',
  'total-files-size': '3903',
  'total-position-deletes': '0',
  'total-equality-deletes': '0'},
 'schema-id': 0}

You can use e.g. Polars to load a particular snapshot

In [14]:
import polars as pl

snapshots = table_daily_air_quality_data.snapshots()

pdf = pl.scan_iceberg(
    table_daily_air_quality_data,
    snapshot_id=snapshots[0].snapshot_id
)

In [15]:
pdf.collect()

station_number,value,timestamp_measured,formula,measurement_date
str,f64,str,str,date
"""NL01485""",0.9,"""2024-11-20T23:00:00+00:00""","""NO""",2024-11-20
"""NL01485""",17.5,"""2024-11-20T23:00:00+00:00""","""NO2""",2024-11-20
"""NL01485""",0.3,"""2024-11-20T23:00:00+00:00""","""SO2""",2024-11-20
"""NL01485""",51.3,"""2024-11-20T23:00:00+00:00""","""O3""",2024-11-20
"""NL01485""",0.23,"""2024-11-20T23:00:00+00:00""","""C6H6""",2024-11-20
…,…,…,…,…
"""NL01496""",0.16,"""2024-11-20T00:00:00+00:00""","""C6H6""",2024-11-20
"""NL01496""",0.08,"""2024-11-20T00:00:00+00:00""","""C7H8""",2024-11-20
"""NL01496""",3.2,"""2024-11-20T00:00:00+00:00""","""PM25""",2024-11-20
"""NL01496""",11.5,"""2024-11-20T00:00:00+00:00""","""PM10""",2024-11-20
