From 5aa9decde3287b2810b0a404779f8dc1dd1773b5 Mon Sep 17 00:00:00 2001 From: Song Chuanqi <8365457+freedom3219@user.noreply.gitee.com> Date: Fri, 3 Apr 2026 16:59:13 +0800 Subject: [PATCH] chore(provision): make Spark integration test provisioning idempotent --- dev/spark/provision.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/dev/spark/provision.py b/dev/spark/provision.py index cdc538ad..a1688790 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -20,12 +20,50 @@ # Provisions Paimon tables into the warehouse (file:/tmp/paimon-warehouse) # for paimon-rust integration tests to read. +import shutil +from pathlib import Path +from urllib.parse import unquote, urlparse + from pyspark.sql import SparkSession +def _warehouse_path_from_spark_conf(spark: SparkSession) -> Path: + warehouse_uri = spark.conf.get("spark.sql.catalog.paimon.warehouse") + parsed = urlparse(warehouse_uri) + + if parsed.scheme not in ("", "file"): + raise ValueError( + f"Unsupported Paimon warehouse URI scheme {parsed.scheme!r}: {warehouse_uri}" + ) + + if parsed.netloc not in ("", "localhost"): + raise ValueError( + f"Unsupported remote Paimon warehouse location {parsed.netloc!r}: {warehouse_uri}" + ) + + warehouse_path = Path(unquote(parsed.path if parsed.scheme else warehouse_uri)) + if not warehouse_path.is_absolute() or str(warehouse_path) == "/": + raise ValueError(f"Refusing to clear unsafe warehouse path: {warehouse_path}") + + return warehouse_path + + +def _reset_warehouse_dir(warehouse_path: Path) -> None: + warehouse_path.mkdir(parents=True, exist_ok=True) + + for child in warehouse_path.iterdir(): + if child.is_symlink() or child.is_file(): + child.unlink() + else: + shutil.rmtree(child) + + def main(): spark = SparkSession.builder.getOrCreate() + warehouse_path = _warehouse_path_from_spark_conf(spark) + _reset_warehouse_dir(warehouse_path) + # Use Paimon catalog (configured in spark-defaults.conf with warehouse file:/tmp/paimon-warehouse) spark.sql("USE paimon.default")