In [None]:
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.

<center>
<img src="https://hudi.apache.org/assets/images/hudi-logo-medium.png" alt="Hudi logo" width="100%" height="320"/>
</center>

# Querying Hudi Tables using Presto: A Step-by-Step Guide

This guide demonstrates a decoupled Data Lakehouse architecture, showcasing how to ingest and optimize data using Apache Spark and subsequently execute high-performance interactive analytics using Presto.

### 1. Library Imports

In [None]:
import os
import prestodb
import pandas as pd

### 2. Global Configuration

In [None]:
db_name = "presto_db"
table_name = "hudi_trips_table"
s3_base_path = f"s3a://warehouse/"
base_path = os.path.join(s3_base_path, db_name, table_name)

### 3. Spark Session Initialization

In [None]:
%run utils.py

In [None]:
spark = get_spark_session(app_name = "Hudi Presto Example")

### 4. Sample Data Generation

In [None]:
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]

data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
    ("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
    ("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
    ("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
    ("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
    ("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
    ("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
    ("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
    ("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york")
]

In [None]:
input_df = spark.createDataFrame(data).toDF(*columns)
display(input_df, 5)

### 5. Hudi Write Configuration & Ingestion

In [None]:
hudi_write_options = {
    "hoodie.table.name" : table_name,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.metadata.enable": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.meta.sync.enable": "true",
    "hoodie.datasource.hive_sync.partition_fields": "city",
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.metastore.uris": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.database": db_name,
    "hoodie.datasource.hive_sync.table": table_name
}

In [None]:
# Write the data to Hudi
input_df.write.format("hudi") \
    .options(**hudi_write_options) \
    .mode("overwrite") \
    .save(base_path)

In [None]:
df = spark.sql(f"SELECT * FROM {db_name}.{table_name}")
display(df, 5)

### 6. Querying with Presto

In [None]:
# Establish Presto Connection
PRESTO_HOST='presto'
PRESTO_PORT=8080
PRESTO_CATALOG='hudi'
PRESTO_SCHEMA=db_name

conn = prestodb.dbapi.connect(
    host=PRESTO_HOST,
    port=PRESTO_PORT,
    user='presto',
    catalog=PRESTO_CATALOG,
    schema=PRESTO_SCHEMA,
)

In [None]:
# Query the Data from Presto
try:
    query = f"SELECT * FROM {table_name} LIMIT 100"
    cur = conn.cursor()
    cur.execute(query)

    rows = cur.fetchall()
    colnames = [desc[0] for desc in cur.description]
    pandas_df = pd.DataFrame(rows, columns=colnames)
finally:
    cur.close()

df = spark.createDataFrame(pandas_df)
display(df, 5)

In [None]:
# Close the Presto Connection
conn.close()

### 7. Cleanup

In [None]:
# Stop the Spark Session
stop_spark_session()