Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions dev/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,50 @@
# Provisions Paimon tables into the warehouse (file:/tmp/paimon-warehouse)
# for paimon-rust integration tests to read.

import shutil
from pathlib import Path
from urllib.parse import unquote, urlparse

from pyspark.sql import SparkSession


def _warehouse_path_from_spark_conf(spark: SparkSession) -> Path:
warehouse_uri = spark.conf.get("spark.sql.catalog.paimon.warehouse")
parsed = urlparse(warehouse_uri)

if parsed.scheme not in ("", "file"):
raise ValueError(
f"Unsupported Paimon warehouse URI scheme {parsed.scheme!r}: {warehouse_uri}"
)

if parsed.netloc not in ("", "localhost"):
raise ValueError(
f"Unsupported remote Paimon warehouse location {parsed.netloc!r}: {warehouse_uri}"
)

warehouse_path = Path(unquote(parsed.path if parsed.scheme else warehouse_uri))
if not warehouse_path.is_absolute() or str(warehouse_path) == "/":
raise ValueError(f"Refusing to clear unsafe warehouse path: {warehouse_path}")

return warehouse_path


def _reset_warehouse_dir(warehouse_path: Path) -> None:
warehouse_path.mkdir(parents=True, exist_ok=True)

for child in warehouse_path.iterdir():
if child.is_symlink() or child.is_file():
child.unlink()
else:
shutil.rmtree(child)


def main():
spark = SparkSession.builder.getOrCreate()

warehouse_path = _warehouse_path_from_spark_conf(spark)
_reset_warehouse_dir(warehouse_path)

# Use Paimon catalog (configured in spark-defaults.conf with warehouse file:/tmp/paimon-warehouse)
spark.sql("USE paimon.default")

Expand Down
Loading