diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 63ce8910c00fe..d79ddab826e0a 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -43,8 +43,8 @@ use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; use databend_common_sql::plans::UDFType; use databend_common_storage::init_stage_operator; -use tempfile::TempDir; +use self::venv::TempDir; use super::runtime_pool::Pool; use super::runtime_pool::RuntimeBuilder; use crate::physical_plans::UdfFunctionDesc; @@ -60,7 +60,7 @@ static PY_VERSION: LazyLock = LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string())); impl ScriptRuntime { - pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option>) -> Result { + pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option) -> Result { let UDFType::Script(box UDFScriptCode { language, code, .. }) = &func.udf_type else { unreachable!() }; @@ -103,7 +103,7 @@ if '{dir}' not in sys.path: dir = import_dir, )); - let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref()); + let stage_paths = Self::collect_stage_sys_paths(func, temp_dir); if !stage_paths.is_empty() { script.push_str("for _databend_zip in ("); for (idx, path) in stage_paths.iter().enumerate() { @@ -122,7 +122,7 @@ if '{dir}' not in sys.path: }; if let Some(temp_dir) = &_temp_dir { - let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref()); + let stage_paths = Self::collect_stage_sys_paths(func, temp_dir); if !stage_paths.is_empty() { log::info!( "Python UDF {:?} added stage artifacts to sys.path: {:?}", @@ -589,7 +589,7 @@ impl Transform for TransformUdfScript { } } -type RuntimeTimeRes = BTreeMap, Option>)>; +type RuntimeTimeRes = BTreeMap, Option)>; impl TransformUdfScript { pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result { @@ -621,7 +621,7 @@ impl TransformUdfScript { if let Some(entry) = w.get(&key) { Some(entry.materialize().map_err(ErrorCode::from_string)?) } else { - let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?); + let temp_dir = venv::create_venv(PY_VERSION.as_str())?; venv::install_deps(temp_dir.path(), &dependencies)?; if !imports_stage_info.is_empty() { @@ -878,7 +878,6 @@ mod venv { use databend_common_cache::MemSized; use parking_lot::Mutex; use parking_lot::RwLock; - use tempfile::TempDir; use uuid::Uuid; use walkdir::WalkDir; use zip::write::FileOptions; @@ -899,6 +898,86 @@ mod venv { base }); + static PY_VENV_WORK_DIR: LazyLock = LazyLock::new(|| { + let base = std::env::temp_dir().join("databend").join("python_udf_env"); + if let Err(e) = fs::create_dir_all(&base) { + panic!("Failed to create python udf work dir {:?}: {}", base, e); + } + base + }); + + #[derive(Clone)] + pub struct TempDir { + inner: Arc, + } + + struct TempDirInner { + path: PathBuf, + } + + #[derive(Default)] + struct WeakTempDir { + inner: Weak, + } + + impl TempDir { + fn new_impl(path: PathBuf) -> Result { + if path.exists() { + fs::remove_dir_all(&path).map_err(|e| { + format!("Failed to clean python udf temp dir {:?}: {}", path, e) + })?; + } + fs::create_dir_all(&path) + .map_err(|e| format!("Failed to create python udf temp dir {:?}: {}", path, e))?; + Ok(Self { + inner: Arc::new(TempDirInner { path }), + }) + } + + pub fn new() -> Result { + let path = PY_VENV_WORK_DIR.join(Uuid::now_v7().to_string()); + Self::new_impl(path) + } + + pub fn new_with_path(path: PathBuf) -> Result { + Self::new_impl(path) + } + + pub fn path(&self) -> &Path { + &self.inner.path + } + + fn downgrade(&self) -> WeakTempDir { + WeakTempDir { + inner: Arc::downgrade(&self.inner), + } + } + } + + impl WeakTempDir { + fn upgrade(&self) -> Option { + self.inner.upgrade().map(|inner| TempDir { inner }) + } + + fn replace(&mut self, temp_dir: &TempDir) { + self.inner = Arc::downgrade(&temp_dir.inner); + } + } + + impl Drop for TempDirInner { + fn drop(&mut self) { + if let Err(e) = fs::remove_dir_all(&self.path) { + if !matches!(e.kind(), io::ErrorKind::NotFound) { + log::warn!( + "Failed to remove python udf temp dir {:?}: {}", + self.path, + e + ); + } + } + } + } + pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { if deps.is_empty() { return Ok(()); @@ -963,32 +1042,19 @@ mod venv { Ok(archive_path) } - pub fn restore_env(archive_path: &Path) -> Result { - let temp_dir = - tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; + pub fn restore_env_into(archive_path: &Path, temp_dir_path: &Path) -> Result<(), String> { let reader = File::open(archive_path) .map_err(|e| format!("Failed to read python deps archive: {}", e))?; let mut archive = ZipArchive::new(reader).map_err(|e| format!("Failed to open archive: {}", e))?; archive - .extract(temp_dir.path()) + .extract(temp_dir_path) .map_err(|e| format!("Failed to extract python deps: {}", e))?; - Ok(temp_dir) + Ok(()) } pub fn create_venv(_python_version: &str) -> Result { - let temp_dir = - tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; - - // let env_path = temp_dir.path().join(".venv"); - // Command::new("python") - // .args(["-m", "venv", env_path.to_str().unwrap()]) - // .stdout(std::process::Stdio::null()) - // .stderr(std::process::Stdio::null()) - // .status() - // .map_err(|e| format!("Failed to create venv: {}", e))?; - - Ok(temp_dir) + TempDir::new() } pub fn detect_python_version() -> Result { @@ -1017,7 +1083,8 @@ mod venv { // Add this after the PY_VERSION LazyLock declaration // A simple LRU cache for Python virtual environments pub(crate) struct PyVenvCacheEntry { - temp_dir: Mutex>, + temp_dir: Mutex, + temp_dir_path: PathBuf, archive_path: PathBuf, } @@ -1074,25 +1141,27 @@ mod venv { impl MemSized for PyVenvCacheEntry { fn mem_bytes(&self) -> usize { - std::mem::size_of::>>() + std::mem::size_of::() + std::mem::size_of::>() + 2 * std::mem::size_of::() } } impl PyVenvCacheEntry { - pub fn new(temp_dir: Arc, archive_path: PathBuf) -> Self { + pub fn new(temp_dir: TempDir, archive_path: PathBuf) -> Self { Self { - temp_dir: Mutex::new(Arc::downgrade(&temp_dir)), + temp_dir: Mutex::new(temp_dir.downgrade()), + temp_dir_path: temp_dir.path().to_path_buf(), archive_path, } } - pub fn materialize(&self) -> Result, String> { + pub fn materialize(&self) -> Result { if let Some(existing) = self.temp_dir.lock().upgrade() { return Ok(existing); } - let temp_dir = Arc::new(restore_env(&self.archive_path)?); - *self.temp_dir.lock() = Arc::downgrade(&temp_dir); + let temp_dir = TempDir::new_with_path(self.temp_dir_path.clone())?; + restore_env_into(&self.archive_path, temp_dir.path())?; + self.temp_dir.lock().replace(&temp_dir); Ok(temp_dir) } }