## Introduction

In my [last post](https://binayakd.tech/posts/2024-08-30-exploring-iceberg/), I explored the fundamentals of how to create Apache Iceberg tables, using various catalogs, and how to use Spark and Trino to write and read data into and from these Iceberg tables. That involved using Spark as the the Iceberg client to write data into Iceberg table. 

However, in the case that data is already in object storage, following this process to create Iceberg tables, would involve a full migration (read, write, delete) of the data, which can prove time consuming and costly for large datasets. 

What we need is a workflow similar to [Hive's External tables](https://cwiki.apache.org/confluence/display/Hive/Managed+vs.+External+Tables), where writing and updating of the data is managed by an external process (or managed by a preexisting pipeline), and the Iceberg tables is the metadata layer, allowing querying of the data. 

This very problem has been addressed before in [this article](https://medium.com/inquery-data/registering-s3-files-into-apache-iceberg-tables-without-the-rewrites-3c087cb01658). However, that article used the Iceberg Java APIs, and is over one year old as of writing this, and proved to be somewhat cumbersome. 

Fortunately Pyiceberg, has come to the rescue to provide a more straightforward way to achieve this. Specifically, we can use the [`add_files`](https://py.iceberg.apache.org/api/#add-fields) method to register parquet files to a Iceberg table without rewrites. 

In this post, I will be essentially be following the Pyiceberg [Getting started tutorial](https://py.iceberg.apache.org/) with the difference being, I will being using Minio as the object storage, and using the `add_files` function, instead of appending (writing) the data.

For this we need to setup Minio, and and Postgres as the backend for the Iceberg SQL catalog, which we can conveniently setup using a Docker compose file (found in this repo). You can of courses also just use files in local file system, and SQLite backed catalog, but that does not properly show the benefits of this workflow, which is to be able to migrate existing data in object storage to Iceberg format, without doing expensive rewrites. 

## Test data setup
We will be using the classic NYC Taxi datasets for these tests. So we download the set for January 2024, save it to our local filesystem, in the test-data folder.

In [None]:
!curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet -o ./local-data/test-data/yellow_tripdata_2024-01.parquet

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 47.6M  100 47.6M    0     0  4217k      0  0:00:11  0:00:11 --:--:-- 5225k


Then we will simulate a data generation process, such as ELT pipeline to upload our Minio instance. We will use Polars to do this here, but we can just as easily be using something like Spark or Pandas. 

> Note: There is a bug in Pyiceberg 0.8.0, where  the `add_files` method raises a Exception if the parquet file does not have column statistics. It just so happens that the NYC Taxi dataset parquet files do not have column statics. This is also another reason why we have to read the files with Polars, and re-write it t0 Minio instead of uploading the file directly, as Polars will add the statistics by default. 
> 
>This should be fixed in Pyiceberg 0.8.1, with merger of [this pull request](https://github.com/apache/iceberg-python/pull/1354)

First read the file from local file system into a polars dataframe:

In [62]:
import polars as pl

df = pl.read_parquet("./local-data/test-data/yellow_tripdata_2024-01.parquet")

We now need to convert downcast the nanosecond timestamp columns into microsecond, as PyIceberg only supports down to microseconds. There is a mechanism for PyIceberg to help us to do the casting automatically using a [configurations or environment variable](https://py.iceberg.apache.org/configuration/#nanoseconds-support), however this only works if we are writing to the Iceberg table directly, instead of adding existing files. 

Thus this has to be done manually. We first check which columns need casting by getting the schema:

In [63]:
df.schema

Schema([('VendorID', Int32),
        ('tpep_pickup_datetime', Datetime(time_unit='ns', time_zone=None)),
        ('tpep_dropoff_datetime', Datetime(time_unit='ns', time_zone=None)),
        ('passenger_count', Int64),
        ('trip_distance', Float64),
        ('RatecodeID', Int64),
        ('store_and_fwd_flag', String),
        ('PULocationID', Int32),
        ('DOLocationID', Int32),
        ('payment_type', Int64),
        ('fare_amount', Float64),
        ('extra', Float64),
        ('mta_tax', Float64),
        ('tip_amount', Float64),
        ('tolls_amount', Float64),
        ('improvement_surcharge', Float64),
        ('total_amount', Float64),
        ('congestion_surcharge', Float64),
        ('Airport_fee', Float64)])

From here we see that columns `tpep_pickup_datetime` and `tpep_dropoff_datetime` are of type `Datatime` with time unit "ns". So those are what needs to be casted.

In [64]:
df = df.with_columns(pl.col("tpep_pickup_datetime").dt.cast_time_unit("ms"))
df = df.with_columns(pl.col("tpep_dropoff_datetime").dt.cast_time_unit("ms"))

We check the schema again:

In [65]:
df.schema

Schema([('VendorID', Int32),
        ('tpep_pickup_datetime', Datetime(time_unit='ms', time_zone=None)),
        ('tpep_dropoff_datetime', Datetime(time_unit='ms', time_zone=None)),
        ('passenger_count', Int64),
        ('trip_distance', Float64),
        ('RatecodeID', Int64),
        ('store_and_fwd_flag', String),
        ('PULocationID', Int32),
        ('DOLocationID', Int32),
        ('payment_type', Int64),
        ('fare_amount', Float64),
        ('extra', Float64),
        ('mta_tax', Float64),
        ('tip_amount', Float64),
        ('tolls_amount', Float64),
        ('improvement_surcharge', Float64),
        ('total_amount', Float64),
        ('congestion_surcharge', Float64),
        ('Airport_fee', Float64)])

There is one more update we need to do to the data. In my [previous post](https://binayakd.tech/posts/2024-08-30-exploring-iceberg/#writing-the-data-to-iceberg-table), we found out that although this file is marked for 2024-01, it actually has some stray data from some other months. We need to remove those extra month's data, as this will cause issues when we try to add this file to the Iceberg table partitioned by month. 

This is because, since adding files does not modify the actual files, the process will not be able to split the files into the different partitioned parquet files, and also can't add a single file to multiple partitions.

So we can use polars to do this filtering:

In [66]:
df = df.filter(
    (pl.col("tpep_pickup_datetime").dt.year() == 2024) & (pl.col("tpep_pickup_datetime").dt.month() == 1)
)

And we check if the filtering worked:

In [67]:
(df
 .with_columns(pl.col("tpep_pickup_datetime").dt.year().alias("year"))
 .with_columns(pl.col("tpep_pickup_datetime").dt.month().alias("month"))
 .unique(subset=["year", "month"])
 .select(['year', 'month'])
)

year,month
i32,i8
2024,1


We can now write it into Minio. For that, we first setup the storage options for Minio:

In [68]:
import s3fs

conn_data = { 
    'key': 'admin', 
    'secret': 'password', 
    'client_kwargs': { 
        'endpoint_url': 'http://localhost:9000' 
        }
}
fs = s3fs.S3FileSystem(**conn_data)


And finally write it to our desired bucket and location, with statistics enabled:

In [69]:
s3_path = "s3://warehouse/data/yellow_tripdata_2024-01.parquet"

with fs.open(s3_path, "wb") as f:
    df.write_parquet(f, statistics=True)

  datetime_now = datetime.datetime.utcnow()


## Creating an SQL Catalog
As mentioned, we will be creating an SQL catalog, using the Postgres instance as the DB backend. We also include the Minio connection details for the Warehouse location. This should correspond to the object storage instance that contains the preexisting files we want to add to the Iceberg tables.

In [70]:
from pyiceberg.catalog.sql import SqlCatalog

catalog = SqlCatalog(
    "default",
    **{
        "uri": "postgresql+psycopg2://postgres:postgres@localhost:5432/postgres",
        "warehouse": "s3://warehouse/iceberg",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password"
    }
)

## Creating the Iceberg Table

Now that we have our catalog setup, we need to first create the table, with a defined schema. 
This schema can be gotten from the Parquet file directly, using PyArrow. 

First we create a filesystem object to let Pyarrow know how to connect to Minio:

In [71]:
import pyarrow.parquet as pq
from pyarrow import fs


minio = fs.S3FileSystem(
    endpoint_override='localhost:9000',
    access_key="admin",
    secret_key="password",
    scheme="http"
)


Then we read the file as a PyArrow table from the specific bucket and path, and the Minio filesystem:

In [72]:
df = pq.read_table(
    "warehouse/data/yellow_tripdata_2024-01.parquet",
    filesystem=minio
)

We can check what the schema actually looks like, to ensure its matches to what we wrote before:

In [73]:
df.schema

VendorID: int32
tpep_pickup_datetime: timestamp[ms]
tpep_dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double

We now have enough setup to create the namespace and table.

Creating the namespace:

In [74]:
catalog.create_namespace("nyc_taxi_data")

And then the table:

In [75]:
table = catalog.create_table(
    "nyc_taxi_data.yellow_tripdata",
    schema=df.schema
)

Now we add the partition field (column) by using `MonthTransform` on the `tpep_pickup_datetime` column, to have the data partitioned by month.

In [76]:
from pyiceberg.transforms import MonthTransform

with table.update_spec() as update_spec:
    update_spec.add_field(
        source_column_name="tpep_pickup_datetime",
        transform=MonthTransform(),
        partition_field_name="tpep_pickup_datetime_month"
    )





## Adding Parquet File to Table

Now that we have created the table, with the partition fields, we can finally add the parquet file to the table. First we reload the table reference by the table name, just in case we need to re-run this, as `create_table` method cannot be run multiple time.

In [77]:
table = catalog.load_table("nyc_taxi_data.yellow_tripdata")
    

Now we use the `add_files` method to add the file. Since this method takes in a list, we have to setup the list with our one file:

In [78]:
table.add_files(["s3://warehouse/data/yellow_tripdata_2024-01.parquet"])



In [None]:
!curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet -o ./local-data/warehouse/bronze/yellow_tripdata_2024-02.parquet