In [None]:
!pip install tableauhyperapi



In [None]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
from pathlib import Path
from tableauhyperapi import (
    HyperProcess, Telemetry,
    Connection, CreateMode,
    SchemaName, TableName,
    NULLABLE, SqlType, TableDefinition,
    Inserter, HyperException
)

# Cluster
cluster = LocalCUDACluster()
client  = Client(cluster)

# Constants
INT_COLS = ["pickup_location_id", "dropoff_location_id"]
TEXT_COLS = ["pickup_date", "pickup_time", "dropoff_date", "dropoff_time"]

START_PARTITION = 3

def run_create_hyper_file_from_parquet(parquet_file_path: Path,
        table_definition: TableDefinition,
        hyper_database_path: Path,
        npartitions: int = None):

    ddf = dask_cudf.read_parquet(str(parquet_file_path))
    if npartitions:
        ddf = ddf.repartition(npartitions=npartitions)

    for c in INT_COLS:
        ddf[c] = ddf[c].astype('float64')
        mask   = ddf[c].isnull() | ((ddf[c] % 1) == 0)
        ddf    = ddf[mask]
        ddf[c]  = ddf[c].astype('int64')


    for c in TEXT_COLS:
        ddf[c] = ddf[c].astype('str')

    ddf = ddf.persist()

    with HyperProcess(telemetry=Telemetry.SEND_USAGE_DATA_TO_TABLEAU) as hyper, \
         Connection(endpoint=hyper.endpoint,
                    database=hyper_database_path,
                    create_mode=CreateMode.CREATE_AND_REPLACE if not hyper_database_path.exists() else CreateMode.NONE) as conn:

        # conn.catalog.create_schema(schema=SchemaName("Extract"))
        # conn.catalog.create_table(table_definition=table_definition)
        total_inserted = total_skipped = 0

        for i, part in enumerate(ddf.to_delayed()):
            if i != START_PARTITION:
                print(f"Skipping partition {i} (already processed)")
                continue

            try:
                gdf = part.compute()
                pdf = gdf.to_pandas()
                rows = list(pdf.itertuples(index=False, name=None))
                with Inserter(conn, table_definition) as inserter:
                    inserter.add_rows(rows)
                    inserter.execute()
                    total_inserted += len(rows)
                    print(f"Inserted partition {i} ({len(rows)} rows)")
            except HyperException as e:
                print(f"Skipped partition {i} due to HyperException: {e}")
                total_skipped += 1

        print(f"\n✅ Finished. Inserted: {total_inserted}, Skipped: {total_skipped}")


if __name__ == "__main__":
    pq_path    = Path("/content/drive/MyDrive/yellow_trip_data/combined_yellow.parquet")
    hyper_path = pq_path.with_suffix(".hyper")

    table_definition = TableDefinition(
        table_name=TableName("Extract", "combined_yellow"),
        columns=[
            TableDefinition.Column("pickup_date",           SqlType.text(),    NULLABLE),
            TableDefinition.Column("pickup_time",           SqlType.text(),    NULLABLE),
            TableDefinition.Column("dropoff_date",          SqlType.text(),    NULLABLE),
            TableDefinition.Column("dropoff_time",          SqlType.text(),    NULLABLE),
            TableDefinition.Column("trip_time",             SqlType.double(),  NULLABLE),
            TableDefinition.Column("passenger_count",       SqlType.double(),  NULLABLE),
            TableDefinition.Column("trip_distance",         SqlType.double(),  NULLABLE),
            TableDefinition.Column("pickup_location_id",    SqlType.big_int(), NULLABLE),
            TableDefinition.Column("dropoff_location_id",   SqlType.big_int(), NULLABLE),
            TableDefinition.Column("fare_amount",           SqlType.double(),  NULLABLE),
            TableDefinition.Column("extra",                 SqlType.double(),  NULLABLE),
            TableDefinition.Column("mta_tax",               SqlType.double(),  NULLABLE),
            TableDefinition.Column("tip_amount",            SqlType.double(),  NULLABLE),
            TableDefinition.Column("tolls_amount",          SqlType.double(),  NULLABLE),
            TableDefinition.Column("improvement_surcharge", SqlType.double(),  NULLABLE),
            TableDefinition.Column("total_amount",          SqlType.double(),  NULLABLE),
            TableDefinition.Column("congestion_surcharge",  SqlType.double(),  NULLABLE),
            TableDefinition.Column("Airport_fee",           SqlType.double(),  NULLABLE),
            TableDefinition.Column("cbd_congestion_fee",    SqlType.double(),  NULLABLE),
            TableDefinition.Column("fare_per_mile",         SqlType.double(),  NULLABLE),
            TableDefinition.Column("RatecodeID",            SqlType.text(),    NULLABLE),
        ]
    )

    run_create_hyper_file_from_parquet(
        pq_path,
        table_definition,
        hyper_path,
        npartitions=8
    )


Perhaps you already have a cluster running?
Hosting the HTTP server on port 40231 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:32883
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:40231/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40841'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:43515 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:43515
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:59282
INFO:distributed.scheduler:Receive client connection: Client-5219b103-2aa5-11f0-9e21-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:59288


Skipping partition 0 (already processed)
Skipping partition 1 (already processed)
Skipping partition 2 (already processed)


INFO:distributed.core:Event loop was unresponsive in Scheduler for 5.85s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Nanny for 5.85s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Nanny for 5.86s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Scheduler for 5.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.


Skipped partition 3 due to HyperException: COPY-IN cancelled.
Context: 0x3df1553f
Skipping partition 4 (already processed)
Skipping partition 5 (already processed)
Skipping partition 6 (already processed)
Skipping partition 7 (already processed)

✅ Finished. Inserted: 0, Skipped: 1


INFO:distributed.core:Event loop was unresponsive in Scheduler for 10.81s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Scheduler for 10.81s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Nanny for 10.79s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
