In [None]:
### Create Iceberg namespace

In [None]:
import os

CATALOG_NAME = os.getenv("CATALOG_NAME", "lakehouse-poc")
LAKEHOUSE_DATA_PATH = os.getenv("LAKEHOUSE_DATA_PATH", "/lakehouse-poc")

In [None]:
from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table
from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, StringType, TimestampType, NestedField, IntegerType, BooleanType, LongType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import YearTransform, MonthTransform, DayTransform, IdentityTransform
from pyiceberg.table.sorting import SortOrder, SortField

catalog = load_catalog(name=CATALOG_NAME)
NAMESPACE = "jupyter"
TABLE_NAME = "nyc_taxi_yellow_tripdata"
FULL_TABLE_NAME = f"{NAMESPACE}.{TABLE_NAME}"

def create_namespace(ns):
    current_namespaces = { x[0] for x in catalog.list_namespaces() }
    if ns in current_namespaces:
        print(f"namespace {ns} exists. Clearing data...")
        for table in catalog.list_tables(namespace=ns):
            catalog.drop_table(table)
        catalog.drop_namespace(namespace=ns)
    catalog.create_namespace(
        namespace=ns,
        properties=dict(customer_name=ns, owner="blahblah@lakehouse.com")
    )
    print(f"database created for namespace <{ns}>")
create_namespace(NAMESPACE)

### Create Iceberg table

In [None]:
columns = [
    ("VendorID", IntegerType()),
    ("tpep_pickup_datetime", TimestampType()),
    ("tpep_dropoff_datetime", TimestampType()),
    ("passenger_count", LongType()),
    ("trip_distance", DoubleType()),
    ("RatecodeID", LongType()),
    ("store_and_fwd_flag", StringType()),
    ("PULocationID", IntegerType()),
    ("DOLocationID", IntegerType()),
    ("payment_type", LongType()),
    ("fare_amount", DoubleType()),
    ("extra", DoubleType()),
    ("mta_tax", DoubleType()),
    ("tip_amount", DoubleType()),
    ("tolls_amount", DoubleType()),
    ("improvement_surcharge", DoubleType()),
    ("total_amount", DoubleType()),
    ("congestion_surcharge", DoubleType()),
    ("Airport_fee", DoubleType()),
]
iceberg_fields = []
for index, column_tuple in enumerate(columns):
    name, field_type = column_tuple
    iceberg_fields.append(
         NestedField(field_id=index+1, name=name, field_type=field_type, required=False)
    )
iceberg_schema = Schema(*iceberg_fields)

# partition tpep_pickup_datetime by day
partition_spec = PartitionSpec(
    PartitionField(field_id=2, source_id=2, transform=DayTransform(), name="day"),
)

# Create the table
table = catalog.create_table(
    identifier=FULL_TABLE_NAME,
    location=f"s3a://warehouse/{NAMESPACE}",
    schema=iceberg_schema
)

### Validate namespace and table have been created with pyiceberg

In [None]:
from pyiceberg.catalog import load_catalog
catalog=load_catalog()
ns_names = {x[0] for x in catalog.list_namespaces() }
assert NAMESPACE in ns_names
table_names = { x[1] for x in catalog.list_tables(NAMESPACE) }
assert TABLE_NAME in table_names

### Load data using duckdb

In [None]:
import duckdb

data_file=f"{LAKEHOUSE_DATA_PATH}/datasets/yellow_tripdata_2024-01.parquet"

duckdb_data = duckdb.read_parquet(
    data_file
)
arrow_table = duckdb_data.to_arrow_table()

### Save data into Iceberg table

In [None]:
table.append(arrow_table)

### Query ingested data with Trino client

In [None]:
from trino.dbapi import connect
conn = connect(
    host="trino",
    port=8080,
    user="yooo",
    catalog=CATALOG_NAME,
    schema=NAMESPACE,
)
cur = conn.cursor()
# Get row count
cur.execute(f"SELECT COUNT(*) FROM {FULL_TABLE_NAME}")
row_count = cur.fetchall()[0][0]
assert row_count == 2964624
cur.execute(f"SELECT COUNT(*) FROM {FULL_TABLE_NAME}")
# Get trips from jan second between midnight and 00:30
query = f"""
SELECT * FROM {FULL_TABLE_NAME} 
WHERE tpep_pickup_datetime >= parse_datetime('2024-01-02 00:00:00', 'YYYY-MM-dd HH:mm:ss') AND
      tpep_pickup_datetime <= parse_datetime('2024-01-02 00:30:00', 'YYYY-MM-dd HH:mm:ss')
"""
cur.execute(query)
rows = cur.fetchall()
assert len(rows) == 523
# Get trip count jan first between midnight and 00:30
query = f"""
SELECT COUNT(*) FROM {FULL_TABLE_NAME} 
WHERE tpep_pickup_datetime >= parse_datetime('2024-01-01 00:00:00', 'YYYY-MM-dd HH:mm:ss') AND
      tpep_pickup_datetime <= parse_datetime('2024-01-01 00:30:00', 'YYYY-MM-dd HH:mm:ss')
"""
cur.execute(query)
row_count = cur.fetchall()[0][0]
assert row_count == 2717

### Query ingested data with pyiceberg

In [None]:
from pyiceberg.expressions import EqualTo
scan=table.scan(
    row_filter=EqualTo("Airport_fee", 0),
    limit=100,
).to_arrow()
assert len(scan) == 100

### Query ingested data with pyiceberg and duckdb

In [None]:
con = table.scan().to_duckdb(table_name="nyc_taxi_trips")
df = con.execute(f"SELECT vendorid FROM nyc_taxi_trips").df()
assert len(df) == 2964624
df = con.execute("SELECT Max(tip_amount) FROM nyc_taxi_trips").df()
assert df.to_dict()["max(tip_amount)"][0] == 428.0