In [2]:
spark.conf.get("spark.sql.catalog.demo.s3.endpoint")


'http://minio:9000'

In [3]:
from pyspark.sql import SparkSession

# 初始化 Spark，配置指向 Docker 内部的各服务
spark = SparkSession.builder \
    .appName("VideoDataAnalysis") \
    .config("spark.sql.catalog.demo.s3.path-style-access", "true") \
    .config("spark.sql.catalog.demo.s3.endpoint", "http://minio:9000") \
    .getOrCreate()

# 检查是否连接成功
spark.sql("SHOW CATALOGS").show()

26/01/31 11:06:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------------+
|      catalog|
+-------------+
|         demo|
|spark_catalog|
+-------------+



In [4]:
# 创建数据库
spark.sql("CREATE DATABASE IF NOT EXISTS demo.video_db")


DataFrame[]

In [5]:
spark.sql("SHOW NAMESPACES IN demo").show()


+---------+
|namespace|
+---------+
| video_db|
+---------+



In [6]:
spark.sql("CREATE DATABASE IF NOT EXISTS demo.video_db")

# 创建视频元数据表
# 注意：我们模拟了结构化字段和复杂的 Pose 字段（用 JSON 字符串或 Struct）
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.video_db.metadata (
    video_id STRING,
    video_tag STRING,
    video_quality FLOAT,
    video_caption STRING,
    audio_transcripts STRING,
    video_pose3d_body STRING, -- 实际建议用二进制或复杂格式，演示先用文本
    processed_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (video_tag)
""")

DataFrame[]

In [8]:
from datetime import datetime

# 模拟数据
data = [
    ("vid_001", "action", 0.92, "A man jumping", "Hello world", "{'nodes': [1,2,3]}", datetime.now()),
    ("vid_002", "cooking", 0.85, "Cutting onions", "Add some salt", "{'nodes': [4,5,6]}", datetime.now())
]

columns = ["video_id", "video_tag", "video_quality", "video_caption", "audio_transcripts", "video_pose3d_body", "processed_at"]

# 创建 DataFrame 并写入
df = spark.createDataFrame(data, columns)
df.writeTo("demo.video_db.metadata").append()

print("数据写入成功！")

AnalysisException: [INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA] Cannot write incompatible data for the table `demo`.`video_db`.`metadata`: Cannot find data for the output column `video_pose3d_hand`.

In [9]:
# 查询质量得分大于 0.9 的动作类视频
result = spark.sql("""
    SELECT video_id, video_caption 
    FROM demo.video_db.metadata 
    WHERE video_quality > 0.9 AND video_tag = 'action'
""")
result.show()

+--------+-------------+
|video_id|video_caption|
+--------+-------------+
| vid_001|A man jumping|
+--------+-------------+



In [10]:
# 查看所有历史快照
spark.sql("SELECT * FROM demo.video_db.metadata.snapshots").show()

# 查询特定时间点/快照的数据 (替换成你查到的 snapshot_id)
# spark.sql("SELECT * FROM demo.video_db.metadata VERSION AS OF <snapshot_id>").show()

+--------------------+-------------------+---------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2026-01-31 10:56:...|7465105704115371915|     NULL|   append|s3://warehouse/vi...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+



In [11]:
spark.sql("SELECT * FROM demo.video_db.metadata VERSION AS OF 7465105704115371915").show()

+--------+---------+-------------+--------------+-----------------+------------------+--------------------+
|video_id|video_tag|video_quality| video_caption|audio_transcripts| video_pose3d_body|        processed_at|
+--------+---------+-------------+--------------+-----------------+------------------+--------------------+
| vid_002|  cooking|         0.85|Cutting onions|    Add some salt|{'nodes': [4,5,6]}|2026-01-31 10:56:...|
| vid_001|   action|         0.92| A man jumping|      Hello world|{'nodes': [1,2,3]}|2026-01-31 10:56:...|
+--------+---------+-------------+--------------+-----------------+------------------+--------------------+



In [12]:
spark.sql("ALTER TABLE demo.video_db.metadata ADD COLUMN video_pose3d_hand STRING")

# 再次查看表结构，发现新字段已添加，且不需要重写旧数据
spark.sql("DESCRIBE demo.video_db.metadata").show()

AnalysisException: [FIELDS_ALREADY_EXISTS] Cannot add column, because `video_pose3d_hand` already exists in "STRUCT<video_id: STRING, video_tag: STRING, video_quality: FLOAT, video_caption: STRING, audio_transcripts: STRING, video_pose3d_body: STRING, processed_at: TIMESTAMP, video_pose3d_hand: STRING>".; line 1 pos 0;
AddColumns [QualifiedColType(None,video_pose3d_hand,StringType,true,None,None,None)]
+- ResolvedTable org.apache.iceberg.spark.SparkCatalog@68eb5729, video_db.metadata, demo.video_db.metadata, [video_id#212, video_tag#213, video_quality#214, video_caption#215, audio_transcripts#216, video_pose3d_body#217, processed_at#218, video_pose3d_hand#219]
