From 6de2a5b1f7d2a5f44eaba55ea9407b7e63995392 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 17 May 2024 15:07:58 -0400 Subject: [PATCH] fix(python): release GIL on most operations (#2512) Fixes #2269 --- python/src/lib.rs | 879 ++++++++++++++++++++++++---------------------- 1 file changed, 468 insertions(+), 411 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index bf06fd4b21..8b9b4482c7 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -98,36 +98,39 @@ impl RawDeltaTable { #[new] #[pyo3(signature = (table_uri, version = None, storage_options = None, without_files = false, log_buffer_size = None))] fn new( + py: Python, table_uri: &str, version: Option, storage_options: Option>, without_files: bool, log_buffer_size: Option, ) -> PyResult { - let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri); - let options = storage_options.clone().unwrap_or_default(); - if let Some(storage_options) = storage_options { - builder = builder.with_storage_options(storage_options) - } - if let Some(version) = version { - builder = builder.with_version(version) - } - if without_files { - builder = builder.without_files() - } - if let Some(buf_size) = log_buffer_size { - builder = builder - .with_log_buffer_size(buf_size) - .map_err(PythonError::from)?; - } + py.allow_threads(|| { + let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri); + let options = storage_options.clone().unwrap_or_default(); + if let Some(storage_options) = storage_options { + builder = builder.with_storage_options(storage_options) + } + if let Some(version) = version { + builder = builder.with_version(version) + } + if without_files { + builder = builder.without_files() + } + if let Some(buf_size) = log_buffer_size { + builder = builder + .with_log_buffer_size(buf_size) + .map_err(PythonError::from)?; + } - let table = rt().block_on(builder.load()).map_err(PythonError::from)?; - Ok(RawDeltaTable { - _table: table, - _config: FsConfig { - root_url: table_uri.into(), - options, - }, + let table = rt().block_on(builder.load()).map_err(PythonError::from)?; + Ok(RawDeltaTable { + _table: table, + _config: FsConfig { + root_url: table_uri.into(), + options, + }, + }) }) } @@ -194,16 +197,20 @@ impl RawDeltaTable { .map_err(PythonError::from)?) } - pub fn load_version(&mut self, version: i64) -> PyResult<()> { - Ok(rt() - .block_on(self._table.load_version(version)) - .map_err(PythonError::from)?) + pub fn load_version(&mut self, py: Python, version: i64) -> PyResult<()> { + py.allow_threads(|| { + Ok(rt() + .block_on(self._table.load_version(version)) + .map_err(PythonError::from)?) + }) } - pub fn get_latest_version(&mut self) -> PyResult { - Ok(rt() - .block_on(self._table.get_latest_version()) - .map_err(PythonError::from)?) + pub fn get_latest_version(&mut self, py: Python) -> PyResult { + py.allow_threads(|| { + Ok(rt() + .block_on(self._table.get_latest_version()) + .map_err(PythonError::from)?) + }) } pub fn get_num_index_cols(&mut self) -> PyResult { @@ -225,64 +232,73 @@ impl RawDeltaTable { .map(|v| v.iter().map(|v| v.to_string()).collect::>())) } - pub fn load_with_datetime(&mut self, ds: &str) -> PyResult<()> { - let datetime = - DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( - |err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")), - )?); - Ok(rt() - .block_on(self._table.load_with_datetime(datetime)) - .map_err(PythonError::from)?) + pub fn load_with_datetime(&mut self, py: Python, ds: &str) -> PyResult<()> { + py.allow_threads(|| { + let datetime = + DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( + |err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")), + )?); + Ok(rt() + .block_on(self._table.load_with_datetime(datetime)) + .map_err(PythonError::from)?) + }) } pub fn files_by_partitions( &self, + py: Python, partitions_filters: Vec<(&str, &str, PartitionFilterValue)>, ) -> PyResult> { - let partition_filters: Result, DeltaTableError> = partitions_filters - .into_iter() - .map(|filter| match filter { - (key, op, PartitionFilterValue::Single(v)) => { - PartitionFilter::try_from((key, op, v)) - } - (key, op, PartitionFilterValue::Multiple(v)) => { - PartitionFilter::try_from((key, op, v.as_slice())) - } - }) - .collect(); - match partition_filters { - Ok(filters) => Ok(self - ._table - .get_files_by_partitions(&filters) - .map_err(PythonError::from)? - .into_iter() - .map(|p| p.to_string()) - .collect()), - Err(err) => Err(PythonError::from(err).into()), - } + py.allow_threads(|| { + let partition_filters: Result, DeltaTableError> = + partitions_filters + .into_iter() + .map(|filter| match filter { + (key, op, PartitionFilterValue::Single(v)) => { + PartitionFilter::try_from((key, op, v)) + } + (key, op, PartitionFilterValue::Multiple(v)) => { + PartitionFilter::try_from((key, op, v.as_slice())) + } + }) + .collect(); + match partition_filters { + Ok(filters) => Ok(self + ._table + .get_files_by_partitions(&filters) + .map_err(PythonError::from)? + .into_iter() + .map(|p| p.to_string()) + .collect()), + Err(err) => Err(PythonError::from(err).into()), + } + }) } pub fn files( &self, + py: Python, partition_filters: Option>, ) -> PyResult> { - if let Some(filters) = partition_filters { - let filters = convert_partition_filters(filters).map_err(PythonError::from)?; - Ok(self - ._table - .get_files_by_partitions(&filters) - .map_err(PythonError::from)? - .into_iter() - .map(|p| p.to_string()) - .collect()) - } else { - Ok(self - ._table - .get_files_iter() - .map_err(PythonError::from)? - .map(|f| f.to_string()) - .collect()) - } + py.allow_threads(|| { + if let Some(filters) = partition_filters { + let filters = convert_partition_filters(filters).map_err(PythonError::from)?; + Ok(self + ._table + .get_files_by_partitions(&filters) + .map_err(PythonError::from)? + .into_iter() + .map(|p| p.to_string()) + .collect()) + } else { + Ok(self + ._table + .get_files_iter() + .map_err(PythonError::from)? + .map(|f| f.to_string()) + .collect()) + } + }) } pub fn file_uris( @@ -315,31 +331,33 @@ impl RawDeltaTable { #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, custom_metadata=None))] pub fn vacuum( &mut self, + py: Python, dry_run: bool, retention_hours: Option, enforce_retention_duration: bool, custom_metadata: Option>, ) -> PyResult> { - let mut cmd = VacuumBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_enforce_retention_duration(enforce_retention_duration) - .with_dry_run(dry_run); - if let Some(retention_period) = retention_hours { - cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); - } + let (table, metrics) = py.allow_threads(|| { + let mut cmd = VacuumBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_enforce_retention_duration(enforce_retention_duration) + .with_dry_run(dry_run); + if let Some(retention_period) = retention_hours { + cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_commit_properties( + CommitProperties::default().with_metadata(json_metadata), + ); + }; - let (table, metrics) = rt() - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; self._table.state = table.state; Ok(metrics.files_deleted) } @@ -348,42 +366,44 @@ impl RawDeltaTable { #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None))] pub fn update( &mut self, + py: Python, updates: HashMap, predicate: Option, writer_properties: Option>>, safe_cast: bool, custom_metadata: Option>, ) -> PyResult { - let mut cmd = UpdateBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_safe_cast(safe_cast); + let (table, metrics) = py.allow_threads(|| { + let mut cmd = UpdateBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_safe_cast(safe_cast); - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - for (col_name, expression) in updates { - cmd = cmd.with_update(col_name.clone(), expression.clone()); - } + for (col_name, expression) in updates { + cmd = cmd.with_update(col_name.clone(), expression.clone()); + } - if let Some(update_predicate) = predicate { - cmd = cmd.with_predicate(update_predicate); - } + if let Some(update_predicate) = predicate { + cmd = cmd.with_predicate(update_predicate); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_commit_properties( + CommitProperties::default().with_metadata(json_metadata), + ); + }; - let (table, metrics) = rt() - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; self._table.state = table.state; Ok(serde_json::to_string(&metrics).unwrap()) } @@ -397,8 +417,10 @@ impl RawDeltaTable { writer_properties=None, custom_metadata=None, ))] + #[allow(clippy::too_many_arguments)] pub fn compact_optimize( &mut self, + py: Python, partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, @@ -406,38 +428,40 @@ impl RawDeltaTable { writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - let mut cmd = OptimizeBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); - if let Some(size) = target_size { - cmd = cmd.with_target_size(size); - } - if let Some(commit_interval) = min_commit_interval { - cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); - } + let (table, metrics) = py.allow_threads(|| { + let mut cmd = OptimizeBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_commit_properties( + CommitProperties::default().with_metadata(json_metadata), + ); + }; - let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) - .map_err(PythonError::from)?; - cmd = cmd.with_filters(&converted_filters); + let converted_filters = + convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt() - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; self._table.state = table.state; Ok(serde_json::to_string(&metrics).unwrap()) } @@ -454,6 +478,7 @@ impl RawDeltaTable { custom_metadata=None,))] pub fn z_order_optimize( &mut self, + py: Python, z_order_columns: Vec, partition_filters: Option>, target_size: Option, @@ -463,40 +488,42 @@ impl RawDeltaTable { writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - let mut cmd = OptimizeBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) - .with_max_spill_size(max_spill_size) - .with_type(OptimizeType::ZOrder(z_order_columns)); - if let Some(size) = target_size { - cmd = cmd.with_target_size(size); - } - if let Some(commit_interval) = min_commit_interval { - cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); - } + let (table, metrics) = py.allow_threads(|| { + let mut cmd = OptimizeBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) + .with_max_spill_size(max_spill_size) + .with_type(OptimizeType::ZOrder(z_order_columns)); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_commit_properties( + CommitProperties::default().with_metadata(json_metadata), + ); + }; - let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) - .map_err(PythonError::from)?; - cmd = cmd.with_filters(&converted_filters); + let converted_filters = + convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt() - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; self._table.state = table.state; Ok(serde_json::to_string(&metrics).unwrap()) } @@ -504,28 +531,30 @@ impl RawDeltaTable { #[pyo3(signature = (constraints, custom_metadata=None))] pub fn add_constraints( &mut self, + py: Python, constraints: HashMap, custom_metadata: Option>, ) -> PyResult<()> { - let mut cmd = ConstraintBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ); + let table = py.allow_threads(|| { + let mut cmd = ConstraintBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ); - for (col_name, expression) in constraints { - cmd = cmd.with_constraint(col_name.clone(), expression.clone()); - } + for (col_name, expression) in constraints { + cmd = cmd.with_constraint(col_name.clone(), expression.clone()); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_commit_properties( + CommitProperties::default().with_metadata(json_metadata), + ); + }; - let table = rt() - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; self._table.state = table.state; Ok(()) } @@ -533,27 +562,29 @@ impl RawDeltaTable { #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None))] pub fn drop_constraints( &mut self, + py: Python, name: String, raise_if_not_exists: bool, custom_metadata: Option>, ) -> PyResult<()> { - let mut cmd = DropConstraintBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_constraint(name) - .with_raise_if_not_exists(raise_if_not_exists); + let table = py.allow_threads(|| { + let mut cmd = DropConstraintBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_constraint(name) + .with_raise_if_not_exists(raise_if_not_exists); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_commit_properties( + CommitProperties::default().with_metadata(json_metadata), + ); + }; - let table = rt() - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; self._table.state = table.state; Ok(()) } @@ -885,7 +916,7 @@ impl RawDeltaTable { ) -> PyResult)>> { let path_set = match partition_filters { Some(filters) => Some(HashSet::<_>::from_iter( - self.files_by_partitions(filters)?.iter().cloned(), + self.files_by_partitions(py, filters)?.iter().cloned(), )), None => None, }; @@ -995,8 +1026,10 @@ impl RawDeltaTable { PyFrozenSet::new(py, active_partitions) } + #[allow(clippy::too_many_arguments)] fn create_write_transaction( &mut self, + py: Python, add_actions: Vec, mode: &str, partition_by: Vec, @@ -1004,107 +1037,110 @@ impl RawDeltaTable { partitions_filters: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let mode = mode.parse().map_err(PythonError::from)?; + py.allow_threads(|| { + let mode = mode.parse().map_err(PythonError::from)?; - let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - let existing_schema = self._table.get_schema().map_err(PythonError::from)?; + let existing_schema = self._table.get_schema().map_err(PythonError::from)?; - let mut actions: Vec = add_actions - .iter() - .map(|add| Action::Add(add.into())) - .collect(); + let mut actions: Vec = add_actions + .iter() + .map(|add| Action::Add(add.into())) + .collect(); - match mode { - SaveMode::Overwrite => { - let converted_filters = - convert_partition_filters(partitions_filters.unwrap_or_default()) + match mode { + SaveMode::Overwrite => { + let converted_filters = + convert_partition_filters(partitions_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + + let add_actions = self + ._table + .snapshot() + .map_err(PythonError::from)? + .get_active_add_actions_by_partitions(&converted_filters) .map_err(PythonError::from)?; - let add_actions = self - ._table - .snapshot() - .map_err(PythonError::from)? - .get_active_add_actions_by_partitions(&converted_filters) - .map_err(PythonError::from)?; - - for old_add in add_actions { - let old_add = old_add.map_err(PythonError::from)?; - let remove_action = Action::Remove(Remove { - path: old_add.path().to_string(), - deletion_timestamp: Some(current_timestamp()), - data_change: true, - extended_file_metadata: Some(true), - partition_values: Some( - old_add - .partition_values() - .map_err(PythonError::from)? - .iter() - .map(|(k, v)| { - ( - k.to_string(), - if v.is_null() { - None - } else { - Some(v.serialize()) - }, - ) - }) - .collect(), - ), - size: Some(old_add.size()), - deletion_vector: None, - tags: None, - base_row_id: None, - default_row_commit_version: None, - }); - actions.push(remove_action); - } + for old_add in add_actions { + let old_add = old_add.map_err(PythonError::from)?; + let remove_action = Action::Remove(Remove { + path: old_add.path().to_string(), + deletion_timestamp: Some(current_timestamp()), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some( + old_add + .partition_values() + .map_err(PythonError::from)? + .iter() + .map(|(k, v)| { + ( + k.to_string(), + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect(), + ), + size: Some(old_add.size()), + deletion_vector: None, + tags: None, + base_row_id: None, + default_row_commit_version: None, + }); + actions.push(remove_action); + } - // Update metadata with new schema - if &schema != existing_schema { - let mut metadata = self._table.metadata().map_err(PythonError::from)?.clone(); - metadata.schema_string = serde_json::to_string(&schema) - .map_err(DeltaTableError::from) - .map_err(PythonError::from)?; - actions.push(Action::Metadata(metadata)); + // Update metadata with new schema + if &schema != existing_schema { + let mut metadata = + self._table.metadata().map_err(PythonError::from)?.clone(); + metadata.schema_string = serde_json::to_string(&schema) + .map_err(DeltaTableError::from) + .map_err(PythonError::from)?; + actions.push(Action::Metadata(metadata)); + } } - } - _ => { - // This should be unreachable from Python - if &schema != existing_schema { - DeltaProtocolError::new_err("Cannot change schema except in overwrite."); + _ => { + // This should be unreachable from Python + if &schema != existing_schema { + DeltaProtocolError::new_err("Cannot change schema except in overwrite."); + } } } - } - let operation = DeltaOperation::Write { - mode, - partition_by: Some(partition_by), - predicate: None, - }; + let operation = DeltaOperation::Write { + mode, + partition_by: Some(partition_by), + predicate: None, + }; - rt().block_on( - CommitBuilder::from( - CommitProperties::default().with_metadata( - custom_metadata - .unwrap_or_default() - .into_iter() - .map(|(k, v)| (k, v.into())), - ), - ) - .with_actions(actions) - .build( - Some(self._table.snapshot().map_err(PythonError::from)?), - self._table.log_store(), - operation, + rt().block_on( + CommitBuilder::from( + CommitProperties::default().with_metadata( + custom_metadata + .unwrap_or_default() + .into_iter() + .map(|(k, v)| (k, v.into())), + ), + ) + .with_actions(actions) + .build( + Some(self._table.snapshot().map_err(PythonError::from)?), + self._table.log_store(), + operation, + ) + .map_err(|err| PythonError::from(DeltaTableError::from(err)))? + .into_future(), ) - .map_err(|err| PythonError::from(DeltaTableError::from(err)))? - .into_future(), - ) - .map_err(PythonError::from)?; + .map_err(PythonError::from)?; - Ok(()) + Ok(()) + }) } pub fn get_py_storage_backend(&self) -> PyResult { @@ -1115,16 +1151,24 @@ impl RawDeltaTable { }) } - pub fn create_checkpoint(&self) -> PyResult<()> { - rt().block_on(create_checkpoint(&self._table)) - .map_err(PythonError::from)?; + pub fn create_checkpoint(&self, py: Python) -> PyResult<()> { + py.allow_threads(|| { + Ok::<_, pyo3::PyErr>( + rt().block_on(create_checkpoint(&self._table)) + .map_err(PythonError::from)?, + ) + })?; Ok(()) } - pub fn cleanup_metadata(&self) -> PyResult<()> { - rt().block_on(cleanup_metadata(&self._table)) - .map_err(PythonError::from)?; + pub fn cleanup_metadata(&self, py: Python) -> PyResult<()> { + py.allow_threads(|| { + Ok::<_, pyo3::PyErr>( + rt().block_on(cleanup_metadata(&self._table)) + .map_err(PythonError::from)?, + ) + })?; Ok(()) } @@ -1143,34 +1187,36 @@ impl RawDeltaTable { #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None))] pub fn delete( &mut self, + py: Python, predicate: Option, writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - let mut cmd = DeleteBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ); - if let Some(predicate) = predicate { - cmd = cmd.with_predicate(predicate); - } - - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, + let (table, metrics) = py.allow_threads(|| { + let mut cmd = DeleteBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), ); - } + if let Some(predicate) = predicate { + cmd = cmd.with_predicate(predicate); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - let (table, metrics) = rt() - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_commit_properties( + CommitProperties::default().with_metadata(json_metadata), + ); + }; + + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; self._table.state = table.state; Ok(serde_json::to_string(&metrics).unwrap()) } @@ -1208,6 +1254,7 @@ impl RawDeltaTable { #[pyo3(signature = (dry_run = true, custom_metadata = None))] pub fn repair( &mut self, + _py: Python, dry_run: bool, custom_metadata: Option>, ) -> PyResult { @@ -1599,6 +1646,7 @@ fn write_to_deltalake( #[pyfunction] #[allow(clippy::too_many_arguments)] fn create_deltalake( + py: Python, table_uri: String, schema: PyArrowType, partition_by: Vec, @@ -1609,47 +1657,50 @@ fn create_deltalake( storage_options: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let table = DeltaTableBuilder::from_uri(table_uri) - .with_storage_options(storage_options.unwrap_or_default()) - .build() - .map_err(PythonError::from)?; - - let mode = mode.parse().map_err(PythonError::from)?; - let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - - let mut builder = DeltaOps(table) - .create() - .with_columns(schema.fields().clone()) - .with_save_mode(mode) - .with_partition_columns(partition_by); - - if let Some(name) = &name { - builder = builder.with_table_name(name); - }; + py.allow_threads(|| { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options.unwrap_or_default()) + .build() + .map_err(PythonError::from)?; - if let Some(description) = &description { - builder = builder.with_comment(description); - }; + let mode = mode.parse().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - if let Some(config) = configuration { - builder = builder.with_configuration(config); - }; + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.fields().clone()) + .with_save_mode(mode) + .with_partition_columns(partition_by); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - builder = builder.with_metadata(json_metadata); - }; + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; - rt().block_on(builder.into_future()) - .map_err(PythonError::from)?; + if let Some(description) = &description { + builder = builder.with_comment(description); + }; - Ok(()) + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; + + rt().block_on(builder.into_future()) + .map_err(PythonError::from)?; + + Ok(()) + }) } #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_new_deltalake( + py: Python, table_uri: String, schema: PyArrowType, add_actions: Vec, @@ -1661,46 +1712,49 @@ fn write_new_deltalake( storage_options: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let table = DeltaTableBuilder::from_uri(table_uri) - .with_storage_options(storage_options.unwrap_or_default()) - .build() - .map_err(PythonError::from)?; + py.allow_threads(|| { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options.unwrap_or_default()) + .build() + .map_err(PythonError::from)?; - let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - let mut builder = DeltaOps(table) - .create() - .with_columns(schema.fields().clone()) - .with_partition_columns(partition_by) - .with_actions(add_actions.iter().map(|add| Action::Add(add.into()))); + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.fields().clone()) + .with_partition_columns(partition_by) + .with_actions(add_actions.iter().map(|add| Action::Add(add.into()))); - if let Some(name) = &name { - builder = builder.with_table_name(name); - }; + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; - if let Some(description) = &description { - builder = builder.with_comment(description); - }; + if let Some(description) = &description { + builder = builder.with_comment(description); + }; - if let Some(config) = configuration { - builder = builder.with_configuration(config); - }; + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - builder = builder.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; - rt().block_on(builder.into_future()) - .map_err(PythonError::from)?; + rt().block_on(builder.into_future()) + .map_err(PythonError::from)?; - Ok(()) + Ok(()) + }) } #[pyfunction] #[allow(clippy::too_many_arguments)] fn convert_to_deltalake( + py: Python, uri: String, partition_schema: Option>, partition_strategy: Option, @@ -1710,43 +1764,46 @@ fn convert_to_deltalake( storage_options: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let mut builder = ConvertToDeltaBuilder::new().with_location(uri); + py.allow_threads(|| { + let mut builder = ConvertToDeltaBuilder::new().with_location(uri); - if let Some(part_schema) = partition_schema { - let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?; - builder = builder.with_partition_schema(schema.fields().clone()); - } + if let Some(part_schema) = partition_schema { + let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?; + builder = builder.with_partition_schema(schema.fields().clone()); + } - if let Some(partition_strategy) = &partition_strategy { - let strategy: PartitionStrategy = partition_strategy.parse().map_err(PythonError::from)?; - builder = builder.with_partition_strategy(strategy); - } + if let Some(partition_strategy) = &partition_strategy { + let strategy: PartitionStrategy = + partition_strategy.parse().map_err(PythonError::from)?; + builder = builder.with_partition_strategy(strategy); + } - if let Some(name) = &name { - builder = builder.with_table_name(name); - } + if let Some(name) = &name { + builder = builder.with_table_name(name); + } - if let Some(description) = &description { - builder = builder.with_comment(description); - } + if let Some(description) = &description { + builder = builder.with_comment(description); + } - if let Some(config) = configuration { - builder = builder.with_configuration(config); - }; + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; - if let Some(strg_options) = storage_options { - builder = builder.with_storage_options(strg_options); - }; + if let Some(strg_options) = storage_options { + builder = builder.with_storage_options(strg_options); + }; - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - builder = builder.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; - rt().block_on(builder.into_future()) - .map_err(PythonError::from)?; - Ok(()) + rt().block_on(builder.into_future()) + .map_err(PythonError::from)?; + Ok(()) + }) } #[pyfunction]