Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use databend_common_sql::plans::UDFLanguage;
use databend_common_sql::plans::UDFScriptCode;
use databend_common_sql::plans::UDFType;
use databend_common_storage::init_stage_operator;
use tempfile::TempDir;

use self::venv::TempDir;
use super::runtime_pool::Pool;
use super::runtime_pool::RuntimeBuilder;
use crate::physical_plans::UdfFunctionDesc;
Expand All @@ -60,7 +60,7 @@ static PY_VERSION: LazyLock<String> =
LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string()));

impl ScriptRuntime {
pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option<Arc<TempDir>>) -> Result<Self> {
pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option<TempDir>) -> Result<Self> {
let UDFType::Script(box UDFScriptCode { language, code, .. }) = &func.udf_type else {
unreachable!()
};
Expand Down Expand Up @@ -103,7 +103,7 @@ if '{dir}' not in sys.path:
dir = import_dir,
));

let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref());
let stage_paths = Self::collect_stage_sys_paths(func, temp_dir);
if !stage_paths.is_empty() {
script.push_str("for _databend_zip in (");
for (idx, path) in stage_paths.iter().enumerate() {
Expand All @@ -122,7 +122,7 @@ if '{dir}' not in sys.path:
};

if let Some(temp_dir) = &_temp_dir {
let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref());
let stage_paths = Self::collect_stage_sys_paths(func, temp_dir);
if !stage_paths.is_empty() {
log::info!(
"Python UDF {:?} added stage artifacts to sys.path: {:?}",
Expand Down Expand Up @@ -589,7 +589,7 @@ impl Transform for TransformUdfScript {
}
}

type RuntimeTimeRes = BTreeMap<String, (Arc<ScriptRuntime>, Option<Arc<TempDir>>)>;
type RuntimeTimeRes = BTreeMap<String, (Arc<ScriptRuntime>, Option<TempDir>)>;

impl TransformUdfScript {
pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result<RuntimeTimeRes> {
Expand Down Expand Up @@ -621,7 +621,7 @@ impl TransformUdfScript {
if let Some(entry) = w.get(&key) {
Some(entry.materialize().map_err(ErrorCode::from_string)?)
} else {
let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?);
let temp_dir = venv::create_venv(PY_VERSION.as_str())?;
venv::install_deps(temp_dir.path(), &dependencies)?;

if !imports_stage_info.is_empty() {
Expand Down Expand Up @@ -878,7 +878,6 @@ mod venv {
use databend_common_cache::MemSized;
use parking_lot::Mutex;
use parking_lot::RwLock;
use tempfile::TempDir;
use uuid::Uuid;
use walkdir::WalkDir;
use zip::write::FileOptions;
Expand All @@ -899,6 +898,86 @@ mod venv {
base
});

static PY_VENV_WORK_DIR: LazyLock<PathBuf> = LazyLock::new(|| {
let base = std::env::temp_dir().join("databend").join("python_udf_env");
if let Err(e) = fs::create_dir_all(&base) {
panic!("Failed to create python udf work dir {:?}: {}", base, e);
}
base
});

#[derive(Clone)]
pub struct TempDir {
inner: Arc<TempDirInner>,
}

struct TempDirInner {
path: PathBuf,
}

#[derive(Default)]
struct WeakTempDir {
inner: Weak<TempDirInner>,
}

impl TempDir {
fn new_impl(path: PathBuf) -> Result<Self, String> {
if path.exists() {
fs::remove_dir_all(&path).map_err(|e| {
format!("Failed to clean python udf temp dir {:?}: {}", path, e)
})?;
}
fs::create_dir_all(&path)
.map_err(|e| format!("Failed to create python udf temp dir {:?}: {}", path, e))?;
Ok(Self {
inner: Arc::new(TempDirInner { path }),
})
}

pub fn new() -> Result<Self, String> {
let path = PY_VENV_WORK_DIR.join(Uuid::now_v7().to_string());
Self::new_impl(path)
}

pub fn new_with_path(path: PathBuf) -> Result<Self, String> {
Self::new_impl(path)
}

pub fn path(&self) -> &Path {
&self.inner.path
}

fn downgrade(&self) -> WeakTempDir {
WeakTempDir {
inner: Arc::downgrade(&self.inner),
}
}
}

impl WeakTempDir {
fn upgrade(&self) -> Option<TempDir> {
self.inner.upgrade().map(|inner| TempDir { inner })
}

fn replace(&mut self, temp_dir: &TempDir) {
self.inner = Arc::downgrade(&temp_dir.inner);
}
}

impl Drop for TempDirInner {
fn drop(&mut self) {
if let Err(e) = fs::remove_dir_all(&self.path) {
if !matches!(e.kind(), io::ErrorKind::NotFound) {
log::warn!(
"Failed to remove python udf temp dir {:?}: {}",
self.path,
e
);
}
}
}
}

pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> {
if deps.is_empty() {
return Ok(());
Expand Down Expand Up @@ -963,32 +1042,19 @@ mod venv {
Ok(archive_path)
}

pub fn restore_env(archive_path: &Path) -> Result<TempDir, String> {
let temp_dir =
tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?;
pub fn restore_env_into(archive_path: &Path, temp_dir_path: &Path) -> Result<(), String> {
let reader = File::open(archive_path)
.map_err(|e| format!("Failed to read python deps archive: {}", e))?;
let mut archive =
ZipArchive::new(reader).map_err(|e| format!("Failed to open archive: {}", e))?;
archive
.extract(temp_dir.path())
.extract(temp_dir_path)
.map_err(|e| format!("Failed to extract python deps: {}", e))?;
Ok(temp_dir)
Ok(())
}

pub fn create_venv(_python_version: &str) -> Result<TempDir, String> {
let temp_dir =
tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?;

// let env_path = temp_dir.path().join(".venv");
// Command::new("python")
// .args(["-m", "venv", env_path.to_str().unwrap()])
// .stdout(std::process::Stdio::null())
// .stderr(std::process::Stdio::null())
// .status()
// .map_err(|e| format!("Failed to create venv: {}", e))?;

Ok(temp_dir)
TempDir::new()
}

pub fn detect_python_version() -> Result<String, String> {
Expand Down Expand Up @@ -1017,7 +1083,8 @@ mod venv {
// Add this after the PY_VERSION LazyLock declaration
// A simple LRU cache for Python virtual environments
pub(crate) struct PyVenvCacheEntry {
temp_dir: Mutex<Weak<TempDir>>,
temp_dir: Mutex<WeakTempDir>,
temp_dir_path: PathBuf,
archive_path: PathBuf,
}

Expand Down Expand Up @@ -1074,25 +1141,27 @@ mod venv {

impl MemSized for PyVenvCacheEntry {
fn mem_bytes(&self) -> usize {
std::mem::size_of::<Mutex<Weak<TempDir>>>() + std::mem::size_of::<PathBuf>()
std::mem::size_of::<Mutex<WeakTempDir>>() + 2 * std::mem::size_of::<PathBuf>()
}
}

impl PyVenvCacheEntry {
pub fn new(temp_dir: Arc<TempDir>, archive_path: PathBuf) -> Self {
pub fn new(temp_dir: TempDir, archive_path: PathBuf) -> Self {
Self {
temp_dir: Mutex::new(Arc::downgrade(&temp_dir)),
temp_dir: Mutex::new(temp_dir.downgrade()),
temp_dir_path: temp_dir.path().to_path_buf(),
archive_path,
}
}

pub fn materialize(&self) -> Result<Arc<TempDir>, String> {
pub fn materialize(&self) -> Result<TempDir, String> {
if let Some(existing) = self.temp_dir.lock().upgrade() {
return Ok(existing);
}

let temp_dir = Arc::new(restore_env(&self.archive_path)?);
*self.temp_dir.lock() = Arc::downgrade(&temp_dir);
let temp_dir = TempDir::new_with_path(self.temp_dir_path.clone())?;
restore_env_into(&self.archive_path, temp_dir.path())?;
self.temp_dir.lock().replace(&temp_dir);
Ok(temp_dir)
}
}
Expand Down
Loading