In [1]:
import pandas as pd
from typing import Optional
from trino.dbapi import connect

def query_from_trino(sql: str) -> Optional[pd.DataFrame]:
    with connect(
        host="127.0.0.1",
        port=8080,
        user="mark",
        catalog="iceberg",
        schema="iot_sensor",
    ) as conn:
        cur = conn.cursor()
        cur.execute(sql)
        if cur.description is None:
            return
        fields = [des.name for des in cur.description]
        rows = cur.fetchall()
        records = [dict(zip(fields, row)) for row in rows]
    return pd.DataFrame(records)

In [2]:
sql = "select count(1) as total from iceberg.iot_sensor.equipment_data"
query_from_trino(sql)

Unnamed: 0,total
0,43800


In [3]:
sql = """
    SELECT
        timestamp,
        "temperature (°c)",
        "vibration (mm/s)",
        "pressure (pa)",
        "rpm",
        "maintenance required",
        "temp_change",
        "vib_change"
    FROM iceberg.iot_sensor.equipment_data limit 5
"""
query_from_trino(sql)

Unnamed: 0,timestamp,temperature (°c),vibration (mm/s),pressure (pa),rpm,maintenance required,temp_change,vib_change
0,2019-01-01 00:00:00,0.548793,0.50984,0.619918,0.741161,1,0.501129,0.499619
1,2019-01-01 03:00:00,0.544862,0.500041,0.519955,0.931955,0,0.471992,0.602794
2,2019-01-01 04:00:00,0.423622,0.031649,0.323663,0.799867,0,0.440103,0.264552
3,2019-01-01 01:00:00,0.715185,0.587573,0.802121,0.292862,1,0.584882,0.53863
4,2019-01-01 02:00:00,0.602748,0.294453,0.965546,0.624416,1,0.444534,0.352514


In [4]:
sql = """
    SELECT
        DATE_TRUNC('day', timestamp) AS dt, 
        COUNT_IF("maintenance required" = 1) as maintenance_count,
        count(1) as total,
        MAX("temperature (°c)") AS max_temp,
        MIN("temperature (°c)") AS min_temp
    FROM iceberg.iot_sensor.equipment_data
    GROUP BY DATE_TRUNC('day', timestamp)
    ORDER BY dt DESC
"""
query_from_trino(sql)

Unnamed: 0,dt,maintenance_count,total,max_temp,min_temp
0,2023-12-30,13,24,0.917329,0.023276
1,2023-12-29,11,24,0.925015,0.078644
2,2023-12-28,8,24,0.971311,0.011073
3,2023-12-27,13,24,0.987483,0.013627
4,2023-12-26,14,24,0.968619,0.022891
...,...,...,...,...,...
1820,2019-01-05,9,24,0.962207,0.004623
1821,2019-01-04,11,24,0.976781,0.039119
1822,2019-01-03,9,24,0.988395,0.096035
1823,2019-01-02,11,24,0.944686,0.018719


In [12]:
# Show iceberg table snapshots

sql = """
    SELECT * FROM iceberg.iot_sensor."equipment_data$history"
"""
query_from_trino(sql)

Unnamed: 0,made_current_at,snapshot_id,parent_id,is_current_ancestor
0,2024-12-06 12:47:33.507000+08:00,2625041281614463484,,True
1,2024-12-06 14:40:15.627000+08:00,5065079296734979223,2.625041e+18,False
2,2024-12-06 14:49:57.272000+08:00,343143447224900953,2.625041e+18,False
3,2024-12-06 15:04:51.169000+08:00,828904674460009984,2.625041e+18,True


In [None]:
# Insert new records
sql = """
INSERT INTO iceberg.iot_sensor.equipment_data VALUES
(TIMESTAMP '2024-12-06 00:00:00', 0.148793, 0.509840, 0.619918, 0.741161, 1, 0.501129, 0.499619),
(TIMESTAMP '2024-12-06 01:00:00', 0.348793, 0.509840, 0.519918, 0.741161, 1, 0.501129, 0.499619),
(TIMESTAMP '2024-12-06 02:00:00', 0.248793, 0.509840, 0.419918, 0.741161, 0, 0.501129, 0.499619),
(TIMESTAMP '2024-12-06 03:00:00', 0.448793, 0.509840, 0.319918, 0.741161, 0, 0.501129, 0.499619)
"""
query_from_trino(sql)

Unnamed: 0,rows
0,4


In [10]:
sql = "select count(1) as total from iceberg.iot_sensor.equipment_data"
query_from_trino(sql)

Unnamed: 0,total
0,43804


In [11]:
sql = """
    SELECT * FROM iceberg.iot_sensor."equipment_data$snapshots"
"""
query_from_trino(sql)

Unnamed: 0,committed_at,snapshot_id,parent_id,operation,manifest_list,summary
0,2024-12-06 12:47:33.507000+08:00,2625041281614463484,,append,s3://datalakehouse/iot_sensor/equipment_data_4...,"{'added-files-size': '2514477', 'added-data-fi..."
1,2024-12-06 14:40:15.627000+08:00,5065079296734979223,2.625041e+18,append,s3://datalakehouse/iot_sensor/equipment_data_4...,{'trino_query_id': '20241206_064014_00041_zvxf...
2,2024-12-06 14:49:57.272000+08:00,343143447224900953,2.625041e+18,append,s3://datalakehouse/iot_sensor/equipment_data_4...,{'trino_query_id': '20241206_064957_00054_zvxf...
3,2024-12-06 15:04:51.169000+08:00,828904674460009984,2.625041e+18,append,s3://datalakehouse/iot_sensor/equipment_data_4...,{'trino_query_id': '20241206_070450_00007_ffu7...


In [13]:
# Perform Time travel
sql = """
CALL iceberg.system.rollback_to_snapshot('iot_sensor', 'equipment_data', 2625041281614463484)
"""
query_from_trino(sql)

In [14]:
sql = "select count(1) as total from iceberg.iot_sensor.equipment_data"
query_from_trino(sql)

Unnamed: 0,total
0,43800
