In [1]:
import pandas as pd
import trino
import json

# Cấu hình để Pandas hiển thị đầy đủ nội dung (quan trọng khi xem JSON)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_colwidth', None)

print("Các thư viện đã sẵn sàng.")

Các thư viện đã sẵn sàng.


In [6]:
# Kết nối tới Trino service
conn = trino.dbapi.connect(
    host="127.0.0.1",
    port=8083,
    user="don",
    catalog="lakehouse",   # hoặc "iceberg" tùy SHOW CATALOGS
    schema="retail",       # đây là database trong iceberg
)


cursor = conn.cursor()
print(f"Kết nối Trino (Catalog: {conn.catalog}, Schema: {conn.schema}) thành công!")

Kết nối Trino (Catalog: lakehouse, Schema: retail) thành công!


In [7]:
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print("Các bảng có trong schema 'retail':")
print(tables)

# Kết quả mong đợi: [('bronze_raw',)]

Các bảng có trong schema 'retail':
[['bronze_detections']]


In [9]:
sql = """
SELECT
    ingest_ts,
    publish_ts,
    raw_payload,
    source_properties
FROM bronze_detections
LIMIT 1000
"""

df = pd.read_sql(sql, conn)
print(df.head())

  df = pd.read_sql(sql, conn)


DatabaseError: Execution failed on sql: 
SELECT
    ingest_ts,
    publish_ts,
    raw_payload,
    source_properties
FROM bronze_detections
LIMIT 1000

TrinoExternalError(type=EXTERNAL, name=ICEBERG_CATALOG_ERROR, message="Failed to load table: retail.bronze_detections", query_id=20251121_075617_00003_kx822)
unable to rollback

In [8]:
# Xem schema của bảng bronze_raw
df_schema = pd.read_sql("DESCRIBE bronze_raw", conn)
df_schema

  df_schema = pd.read_sql("DESCRIBE bronze_raw", conn)


DatabaseError: Execution failed on sql: DESCRIBE bronze_raw
TrinoUserError(type=USER_ERROR, name=TABLE_NOT_FOUND, message="line 1:1: Table 'lakehouse.retail.bronze_raw' does not exist", query_id=20251121_075509_00002_kx822)
unable to rollback

In [5]:
# Đếm tổng số dòng (records) trong bảng
# Con số này sẽ tăng lên nếu Flink job của bạn vẫn đang chạy
cursor.execute("SELECT COUNT(*) FROM bronze_raw")
total_rows = cursor.fetchone()[0]

print(f"Tổng số dòng (records) trong bảng 'bronze_raw': {total_rows}")

Tổng số dòng (records) trong bảng 'bronze_raw': 410


In [6]:
# Tìm ingest_ts (thời gian ghi dữ liệu) cũ nhất và mới nhất
df_time_range = pd.read_sql("""
    SELECT 
        MIN(ingest_ts) AS min_timestamp,
        MAX(ingest_ts) AS max_timestamp
    FROM bronze_raw
""", conn)

print("Phạm vi thời gian của dữ liệu:")
df_time_range

  df_time_range = pd.read_sql("""


Phạm vi thời gian của dữ liệu:


Unnamed: 0,min_timestamp,max_timestamp
0,2025-11-13 06:54:23.896000+00:00,2025-11-13 06:54:35.082000+00:00


In [7]:
# Đọc 5 dòng đầu tiên
df_sample = pd.read_sql("SELECT * FROM bronze_raw LIMIT 5", conn)
df_sample

  df_sample = pd.read_sql("SELECT * FROM bronze_raw LIMIT 5", conn)


Unnamed: 0,schema_version,pipeline_run_id,frame_index,payload,camera_id,store_id,ingest_ts
0,1.0,04e852b3b933499ea9523264aee7a8f8,1,"{""schema_version"": ""1.0"", ""pipeline_run_id"": ""04e852b3b933499ea9523264aee7a8f8"", ""source"": {""store_id"": ""store_01"", ""camera_id"": ""cam_01"", ""stream_id"": ""stream_01""}, ""frame_index"": 1, ""capture_ts"": ""2025-11-13T06:25:47.484593+00:00"", ""image_size"": {""width"": 1280, ""height"": 720}, ""detections"": [{""det_id"": ""1-0"", ""class"": null, ""class_id"": 0, ""conf"": 0.8199, ""bbox"": {""x1"": 345.0, ""y1"": 58.0, ""x2"": 438.0, ""y2"": 430.0}, ""bbox_norm"": {""x"": 0.26953125, ""y"": 0.08055555555555556, ""w"": 0.07265625, ""h"": 0.5166666666666667}, ""centroid"": {""x"": 392, ""y"": 244}, ""centroid_norm"": {""x"": 0.305859375, ""y"": 0.3388888888888889}, ""track_id"": null}]}",cam_01,store_01,2025-11-13 06:54:23.896000+00:00
1,1.0,04e852b3b933499ea9523264aee7a8f8,2,"{""schema_version"": ""1.0"", ""pipeline_run_id"": ""04e852b3b933499ea9523264aee7a8f8"", ""source"": {""store_id"": ""store_01"", ""camera_id"": ""cam_01"", ""stream_id"": ""stream_01""}, ""frame_index"": 2, ""capture_ts"": ""2025-11-13T06:25:47.614963+00:00"", ""image_size"": {""width"": 1280, ""height"": 720}, ""detections"": [{""det_id"": ""2-0"", ""class"": null, ""class_id"": 0, ""conf"": 0.8254, ""bbox"": {""x1"": 344.0, ""y1"": 58.0, ""x2"": 438.0, ""y2"": 430.0}, ""bbox_norm"": {""x"": 0.26875, ""y"": 0.08055555555555556, ""w"": 0.0734375, ""h"": 0.5166666666666667}, ""centroid"": {""x"": 391, ""y"": 244}, ""centroid_norm"": {""x"": 0.30546875, ""y"": 0.3388888888888889}, ""track_id"": null}]}",cam_01,store_01,2025-11-13 06:54:24.990000+00:00
2,1.0,04e852b3b933499ea9523264aee7a8f8,3,"{""schema_version"": ""1.0"", ""pipeline_run_id"": ""04e852b3b933499ea9523264aee7a8f8"", ""source"": {""store_id"": ""store_01"", ""camera_id"": ""cam_01"", ""stream_id"": ""stream_01""}, ""frame_index"": 3, ""capture_ts"": ""2025-11-13T06:25:47.734826+00:00"", ""image_size"": {""width"": 1280, ""height"": 720}, ""detections"": [{""det_id"": ""3-0"", ""class"": null, ""class_id"": 0, ""conf"": 0.8267, ""bbox"": {""x1"": 344.0, ""y1"": 58.0, ""x2"": 438.0, ""y2"": 429.0}, ""bbox_norm"": {""x"": 0.26875, ""y"": 0.08055555555555556, ""w"": 0.0734375, ""h"": 0.5152777777777777}, ""centroid"": {""x"": 391, ""y"": 244}, ""centroid_norm"": {""x"": 0.30546875, ""y"": 0.33819444444444446}, ""track_id"": 1}]}",cam_01,store_01,2025-11-13 06:54:24.992000+00:00
3,1.0,04e852b3b933499ea9523264aee7a8f8,4,"{""schema_version"": ""1.0"", ""pipeline_run_id"": ""04e852b3b933499ea9523264aee7a8f8"", ""source"": {""store_id"": ""store_01"", ""camera_id"": ""cam_01"", ""stream_id"": ""stream_01""}, ""frame_index"": 4, ""capture_ts"": ""2025-11-13T06:25:47.866162+00:00"", ""image_size"": {""width"": 1280, ""height"": 720}, ""detections"": [{""det_id"": ""4-0"", ""class"": null, ""class_id"": 0, ""conf"": 0.8237, ""bbox"": {""x1"": 344.0, ""y1"": 57.0, ""x2"": 437.0, ""y2"": 430.0}, ""bbox_norm"": {""x"": 0.26875, ""y"": 0.07916666666666666, ""w"": 0.07265625, ""h"": 0.5180555555555556}, ""centroid"": {""x"": 390, ""y"": 244}, ""centroid_norm"": {""x"": 0.305078125, ""y"": 0.33819444444444446}, ""track_id"": 1}]}",cam_01,store_01,2025-11-13 06:54:24.994000+00:00
4,1.0,04e852b3b933499ea9523264aee7a8f8,5,"{""schema_version"": ""1.0"", ""pipeline_run_id"": ""04e852b3b933499ea9523264aee7a8f8"", ""source"": {""store_id"": ""store_01"", ""camera_id"": ""cam_01"", ""stream_id"": ""stream_01""}, ""frame_index"": 5, ""capture_ts"": ""2025-11-13T06:25:47.989360+00:00"", ""image_size"": {""width"": 1280, ""height"": 720}, ""detections"": [{""det_id"": ""5-0"", ""class"": null, ""class_id"": 0, ""conf"": 0.8251, ""bbox"": {""x1"": 344.0, ""y1"": 58.0, ""x2"": 437.0, ""y2"": 431.0}, ""bbox_norm"": {""x"": 0.26875, ""y"": 0.08055555555555556, ""w"": 0.07265625, ""h"": 0.5180555555555556}, ""centroid"": {""x"": 390, ""y"": 244}, ""centroid_norm"": {""x"": 0.305078125, ""y"": 0.33958333333333335}, ""track_id"": 1}]}",cam_01,store_01,2025-11-13 06:54:24.996000+00:00


In [8]:
# Lấy payload của 1 dòng
cursor.execute("SELECT payload FROM bronze_raw WHERE payload IS NOT NULL LIMIT 1")
result = cursor.fetchone()

if result:
    raw_payload_str = result[0]
    
    # Parse (phân tích) chuỗi JSON bằng Python
    payload_json = json.loads(raw_payload_str)
    
    # In ra cấu trúc JSON cho dễ đọc
    print("Cấu trúc của 1 dòng dữ liệu 'payload':")
    print(json.dumps(payload_json, indent=2, ensure_ascii=False))
else:
    print("Không tìm thấy dòng nào có payload. (Có thể job Flink chưa ghi dữ liệu?)")

Cấu trúc của 1 dòng dữ liệu 'payload':
{
  "schema_version": "1.0",
  "pipeline_run_id": "04e852b3b933499ea9523264aee7a8f8",
  "source": {
    "store_id": "store_01",
    "camera_id": "cam_01",
    "stream_id": "stream_01"
  },
  "frame_index": 1,
  "capture_ts": "2025-11-13T06:25:47.484593+00:00",
  "image_size": {
    "width": 1280,
    "height": 720
  },
  "detections": [
    {
      "det_id": "1-0",
      "class": null,
      "class_id": 0,
      "conf": 0.8199,
      "bbox": {
        "x1": 345.0,
        "y1": 58.0,
        "x2": 438.0,
        "y2": 430.0
      },
      "bbox_norm": {
        "x": 0.26953125,
        "y": 0.08055555555555556,
        "w": 0.07265625,
        "h": 0.5166666666666667
      },
      "centroid": {
        "x": 392,
        "y": 244
      },
      "centroid_norm": {
        "x": 0.305859375,
        "y": 0.3388888888888889
      },
      "track_id": null
    }
  ]
}


In [9]:
# Đếm số dòng (records) theo store_id và camera_id
df_counts = pd.read_sql("""
    SELECT 
        store_id, 
        camera_id, 
        COUNT(*) AS total_records
    FROM bronze_raw
    GROUP BY 1, 2
    ORDER BY 3 DESC
""", conn)

print("Phân bố dữ liệu theo camera/store:")
df_counts

  df_counts = pd.read_sql("""


Phân bố dữ liệu theo camera/store:


Unnamed: 0,store_id,camera_id,total_records
0,store_01,cam_01,410


In [10]:
from sqlalchemy import create_engine
import pandas as pd

engine = create_engine("trino://python-client@localhost:8080/lakehouse/rva")

sql = """
SELECT record_count, file_size_in_bytes, file_path
FROM "bronze_raw$files"
ORDER BY file_size_in_bytes DESC
LIMIT 10
"""
df = pd.read_sql(sql, engine)

