In [8]:
%env PYICEBERG_MAX_WORKERS=300

env: PYICEBERG_MAX_WORKERS=300


In [2]:
#!pip install "pyiceberg[s3fs]"

In [9]:
from pyiceberg.catalog.rest import RestCatalog

In [10]:
catalog = RestCatalog("public", ** {
    "uri": f"http://localhost:8181",
})

In [11]:
catalog.create_namespace("public")

In [12]:
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField,
    LongType,
    TimestampType,
    DoubleType,
    StringType,
)

schema = Schema(
    NestedField(field_id=1, name="VendorID", field_type=LongType(), required=False),
    NestedField(field_id=2, name="tpep_pickup_datetime", field_type=TimestampType(), required=False),
    NestedField(field_id=3, name="tpep_dropoff_datetime", field_type=TimestampType(), required=False),
    NestedField(field_id=4, name="passenger_count", field_type=DoubleType(), required=False),
    NestedField(field_id=5, name="trip_distance", field_type=DoubleType(), required=False),
    NestedField(field_id=6, name="RatecodeID", field_type=DoubleType(), required=False),
    NestedField(field_id=7, name="store_and_fwd_flag", field_type=StringType(), required=False),
    NestedField(field_id=8, name="PULocationID", field_type=LongType(), required=False),
    NestedField(field_id=9, name="DOLocationID", field_type=LongType(), required=False),
    NestedField(field_id=10, name="payment_type", field_type=LongType(), required=False),
    NestedField(field_id=11, name="fare_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=12, name="extra", field_type=DoubleType(), required=False),
    NestedField(field_id=13, name="mta_tax", field_type=DoubleType(), required=False),
    NestedField(field_id=14, name="tip_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=15, name="tolls_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=16, name="improvement_surcharge", field_type=DoubleType(), required=False),
    NestedField(field_id=17, name="total_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=18, name="congestion_surcharge", field_type=DoubleType(), required=False),
    NestedField(field_id=19, name="airport_fee", field_type=DoubleType(), required=False),
)


In [13]:
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, MonthTransform, YearTransform, BucketTransform

partition_spec = PartitionSpec(
    PartitionField(source_id=2, field_id=1001, transform=MonthTransform(), name="tpep_pickup_datetime_month"),
)

In [14]:
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

sort_order = SortOrder(
    SortField(source_id=4, transform=IdentityTransform())
)

In [15]:
table = catalog.create_table(
    identifier="public.nyc_taxi",
    schema=schema,
    partition_spec=partition_spec,
    sort_order=sort_order,
    properties={
        "write.format.default": "parquet",
        "write.parquet.compression-codec": "zstd",
        "write.target-file-size-bytes": "536870912",
        "s3.connect-timeout": "10000"
    }
)

In [16]:
import requests
import io
import pyarrow.parquet as pq
from tqdm import tqdm
from datetime import datetime, timedelta

# Base URL for the Parquet files
base_url = "https://pub-f6a668561f5e4bd6ac651efd8c18998d.r2.dev/nyc_taxi/yellow_tripdata_{}.parquet"

# Generate a list of dates from 2020-11 to 2022-12
start_date = datetime(2020, 11, 1)
end_date = datetime(2022, 12, 1)
date_list = []

current_date = start_date
while current_date <= end_date:
    date_list.append(current_date.strftime("%Y-%m"))
    current_date += timedelta(days=32)
    current_date = current_date.replace(day=1)

# Create a progress bar
with tqdm(total=len(date_list), desc="Appending files") as pbar:
    for date_str in date_list:
        file_url = base_url.format(date_str)
        
        # Download the file content
        file_response = requests.get(file_url)
        if file_response.status_code != 200:
            print(f"Failed to download {date_str}: {file_response.status_code}")
            continue
        
        # Read the Parquet file from the response content
        file_content = io.BytesIO(file_response.content)
        df = pq.read_table(file_content)
        
        # Append to the Iceberg table
        table.append(df)
        
        pbar.update(1)
        pbar.set_postfix_str(f"Appended {date_str}")

# Print the total number of rows in the table after appending all files
print(f"Total rows in the table: {len(table.scan().to_arrow())}")

Appending files: 100%|██████████| 26/26 [11:30<00:00, 26.56s/it, Appended 2022-12]


Total rows in the table: 73531304
