In [None]:
!pip install pyiceberg

In [18]:
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.fs as fs
import pyarrow.dataset as ds
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import *

## **Connect to the Nessie REST catalog**

We initialize a connection to the **Nessie Catalog** through its REST API.

This catalog acts as the **metadata management layer** for Iceberg tables.  
All table versions, namespaces, and schema changes will be tracked here.

We verify the connection by listing existing namespaces.


In [3]:
# Configurar la conexión al catálogo REST de Nessie
catalog = load_catalog(
    "nessie",
    **{
        "uri": "http://nessie:19120/iceberg/main/",
    }
)

# Verificar conexion listando los namespaces
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: []


### **Create a new namespace in Nessie**

Since no namespaces were found, we create a new one named **`taxis-project`**  
to organize our Iceberg tables under this logical grouping.

In [None]:
catalog.create_namespace("taxis-project")

We confirm that the `taxis-project` namespace was successfully created by listing all available namespaces again.


In [None]:
# Listar namespaces
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: [('taxis-project',)]


## **Connect to the MinIO storage (S3-compatible)**

We configure access to **MinIO**, which acts as an S3-compatible object storage.  
This connection allows us to browse and read Parquet files that were uploaded in the previous step.


In [19]:
s3 = fs.S3FileSystem(
    access_key="admin",
    secret_key="password",
    endpoint_override="http://minio:9000",
    region="us-east-1"
)

We specify the folder containing the previously ingested dataset and list all the files available in that location.

- Only `.parquet` files are filtered.
- If none are found, an exception is raised.
- The first available Parquet file is selected for conversion to Iceberg format.


In [20]:
# Base folder in the bucket
folder_path = "taxis/taxis_parquet/df_data/"

# List the files in the folder
files = s3.get_file_info(fs.FileSelector(folder_path))

# Filter only Parquet files
parquet_files = [f.path for f in files if f.is_file and f.path.endswith(".parquet")]

if not parquet_files:
    raise FileNotFoundError(f"No .parquet files found in .parquet en {folder_path}")

# Use the first Parquet file found
path = parquet_files[0]

print("File found:", path)

File found: taxis/taxis_parquet/df_data/1761058478.437103.46027fda46.parquet


### **Read the Parquet file into a PyArrow Table**

We create a **PyArrow dataset** from the selected Parquet file using the S3 filesystem.  
Then, we load it into memory as an Arrow `Table` — this structure will later be mapped to an Iceberg schema.


In [None]:
dataset = ds.dataset(path, filesystem=s3, format="parquet")

In [None]:
table = dataset.to_table()

We print the schema of the loaded table to understand its structure and data types.  
This step is necessary to define an equivalent schema for the Iceberg table.


In [None]:
table_schema = table.schema
print(table_schema)

vendor_id: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
ratecode_id: double
store_and_fwd_flag: string
pu_location_id: int32
do_location_id: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
cbd_congestion_fee: double


## **Define a conversion function: Arrow → Iceberg Schema**

We implement a helper function `arrow_to_iceberg()` that converts a PyArrow schema  
into a valid Iceberg schema by mapping each Arrow type to its Iceberg equivalent.

This function iterates through all fields and creates a list of `NestedField` objects,  
ensuring type consistency across both systems.

In [None]:
def arrow_to_iceberg(arrow_schema: pa.Schema) -> Schema:
    fields = []
    field_id = 1

    for field in arrow_schema:
        name = field.name
        arrow_type = field.type

        if pa.types.is_int64(arrow_type):
            iceberg_type = LongType()
        elif pa.types.is_int32(arrow_type):
            iceberg_type = IntegerType()
        elif pa.types.is_float32(arrow_type):
            iceberg_type = FloatType()
        elif pa.types.is_float64(arrow_type):
            iceberg_type = DoubleType()
        elif pa.types.is_string(arrow_type):
            iceberg_type = StringType()
        elif pa.types.is_binary(arrow_type):
            iceberg_type = BinaryType()
        elif pa.types.is_timestamp(arrow_type):
            if arrow_type.tz:
                iceberg_type = TimestamptzType()
            else:
                iceberg_type = TimestampType()
        else:
            raise ValueError(f"Tipo Arrow no soportado todavía: {arrow_type}")

        fields.append(NestedField(field_id, name, iceberg_type, required=not field.nullable))
        field_id += 1

    return Schema(*fields)

We call `arrow_to_iceberg()` to automatically generate an Iceberg-compatible schema  
based on the structure of the original Parquet file.


In [None]:
post_schema = arrow_to_iceberg(table_schema)

### **Create the Iceberg table in Nessie**

We create a new Iceberg table named **`taxis-project.taxis`** in the Nessie catalog,  
using the previously generated schema.

If the table already exists, it will not be recreated to avoid conflicts.

In [None]:
if not catalog.table_exists("taxis-project.taxis"):
    catalog.create_table("taxis-project.taxis", schema=post_schema)
else:
    print("The table taxis-project.taxis already exists, it was not created again.")

### **Append Parquet data to the Iceberg table**

Finally, we load the Iceberg table from the catalog and **append** the Parquet data.  
This writes the dataset into MinIO following the Iceberg table structure,  
enabling schema evolution, partitioning, and metadata tracking via Nessie.

In [None]:
taxis  = catalog.load_table("taxis-project.taxis")
taxis.append(table)