diff --git a/crates/deltalake-core/src/errors.rs b/crates/deltalake-core/src/errors.rs index aaa21a4801..58098876d2 100644 --- a/crates/deltalake-core/src/errors.rs +++ b/crates/deltalake-core/src/errors.rs @@ -76,6 +76,10 @@ pub enum DeltaTableError { #[error("Invalid table version: {0}")] InvalidVersion(i64), + /// Error returned when the DeltaTable has no delta log version. + #[error("Delta log not found for table version: {0}")] + DeltaLogNotFound(i64), + /// Error returned when the DeltaTable has no data files. #[error("Corrupted table, cannot read data file {}: {}", .path, .source)] MissingDataFile { diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index de6a176e91..5c7b89fb70 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -6,6 +6,7 @@ use std::fmt; use std::fmt::Formatter; use std::{cmp::max, cmp::Ordering, collections::HashSet}; +use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::StreamExt; use lazy_static::lazy_static; @@ -455,6 +456,23 @@ impl DeltaTable { self.update().await } + /// Get the commit obj from the version + pub async fn get_obj_from_version( + &self, + current_version: i64, + ) -> Result { + let commit_log_bytes = match self.log_store.read_commit_entry(current_version).await { + Ok(bytes) => Ok(bytes), + Err(DeltaTableError::ObjectStore { + source: ObjectStoreError::NotFound { .. }, + }) => { + return Err(DeltaTableError::DeltaLogNotFound(current_version)); + } + Err(err) => Err(err), + }?; + Ok(commit_log_bytes) + } + /// Get the list of actions for the next commit pub async fn peek_next_commit( &self, diff --git a/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000000.json b/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..ed8c37eaa6 --- /dev/null +++ b/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}} +{"add":{"path":"part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968586000,"dataChange":true}} diff --git a/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000001.json b/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000001.json new file mode 100644 index 0000000000..9b13b788f7 --- /dev/null +++ b/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1587968596254,"operation":"MERGE","operationParameters":{"predicate":"(oldData.`id` = newData.`id`)"},"readVersion":0,"isBlindAppend":false}} +{"add":{"path":"part-00004-95c9bc2c-ac85-4581-b3cc-84502b0c314f-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}} diff --git a/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000003.json b/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000003.json new file mode 100644 index 0000000000..d29d24c053 --- /dev/null +++ b/crates/deltalake-core/tests/data/simple_table_missing_commit/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1587968614187,"operation":"UPDATE","operationParameters":{"predicate":"((id#697L % cast(2 as bigint)) = cast(0 as bigint))"},"readVersion":2,"isBlindAppend":false}} +{"add":{"path":"part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968614000,"dataChange":true}} diff --git a/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet b/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet new file mode 100644 index 0000000000..3706170963 Binary files /dev/null and b/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet differ diff --git a/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet b/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet new file mode 100644 index 0000000000..8f3aadf7bb Binary files /dev/null and b/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet differ diff --git a/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00004-95c9bc2c-ac85-4581-b3cc-84502b0c314f-c000.snappy.parquet b/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00004-95c9bc2c-ac85-4581-b3cc-84502b0c314f-c000.snappy.parquet new file mode 100644 index 0000000000..5770a94ab6 Binary files /dev/null and b/crates/deltalake-core/tests/data/simple_table_missing_commit/part-00004-95c9bc2c-ac85-4581-b3cc-84502b0c314f-c000.snappy.parquet differ diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 228488d91a..ecbfe0459d 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -42,6 +42,7 @@ class RawDeltaTable: ) -> str: ... def table_uri(self) -> str: ... def version(self) -> int: ... + def get_obj(self, version: int) -> bytes: ... def get_latest_version(self) -> int: ... def metadata(self) -> RawDeltaTableMetaData: ... def protocol_versions(self) -> List[int]: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index e7b7613599..f94007168c 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -42,7 +42,7 @@ from deltalake._internal import create_deltalake as _create_deltalake from deltalake._util import encode_partition_value from deltalake.data_catalog import DataCatalog -from deltalake.exceptions import DeltaProtocolError +from deltalake.exceptions import DeltaError, DeltaProtocolError from deltalake.fs import DeltaStorageHandler from deltalake.schema import Schema as DeltaSchema @@ -256,6 +256,7 @@ def __init__( """ self._storage_options = storage_options + self._latest_version = -1 self._table = RawDeltaTable( str(table_uri), version=version, @@ -897,6 +898,36 @@ def update_incremental(self) -> None: """ self._table.update_incremental() + def get_latest_version(self) -> int: + """ + Get latest version of commit. + """ + return self._table.get_latest_version() + + def peek_next_commit( + self, version: int + ) -> Tuple[Optional[List[Dict[Any, Any]]], int]: + """ + Peek next commit of the input version. + """ + actions = [] + next_version = version + 1 + if next_version > self._latest_version: + self._latest_version = self.get_latest_version() + while next_version <= self._latest_version: + try: + commit_log_bytes = self._table.get_obj(next_version) + for commit_action in commit_log_bytes.split(b"\n"): + if commit_action: + actions.append(json.loads(commit_action)) + return actions, next_version + except DeltaError as e: + if str(e) == f"Delta log not found for table version: {next_version}": + next_version += 1 + else: + raise + return None, version + def create_checkpoint(self) -> None: self._table.create_checkpoint() diff --git a/python/src/lib.rs b/python/src/lib.rs index 5741bd40d2..86a32d6e03 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -43,7 +43,7 @@ use deltalake::DeltaOps; use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::{PyFrozenSet, PyType}; +use pyo3::types::{PyBytes, PyFrozenSet, PyType}; use serde_json::{Map, Value}; use crate::error::DeltaProtocolError; @@ -154,6 +154,13 @@ impl RawDeltaTable { Ok(self._table.version()) } + pub fn get_obj<'py>(&self, py: Python<'py>, version: i64) -> PyResult<&'py PyBytes> { + let commit_log_bytes = rt()? + .block_on(self._table.get_obj_from_version(version)) + .map_err(PythonError::from)?; + return Ok(PyBytes::new(py, &commit_log_bytes)); + } + pub fn metadata(&self) -> PyResult { let metadata = self._table.metadata().map_err(PythonError::from)?; Ok(RawDeltaTableMetaData { diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index a49374e710..782ed2ca03 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -493,6 +493,29 @@ def test_writer_fails_on_protocol(): dt.to_pandas() +@pytest.mark.parametrize("version, expected", [(2, (5, 3))]) +def test_peek_next_commit(version, expected): + table_path = "../crates/deltalake-core/tests/data/simple_table" + dt = DeltaTable(table_path) + actions, next_version = dt.peek_next_commit(version=version) + assert (len(actions), next_version) == expected + + +def test_delta_log_not_found(): + table_path = "../crates/deltalake-core/tests/data/simple_table" + dt = DeltaTable(table_path) + latest_version = dt.get_latest_version() + _, version = dt.peek_next_commit(version=latest_version) + assert version == latest_version + + +def test_delta_log_missed(): + table_path = "../crates/deltalake-core/tests/data/simple_table_missing_commit" + dt = DeltaTable(table_path) + _, version = dt.peek_next_commit(version=1) + assert version == 3 # Missed commit version 2, should return version 3 + + class ExcPassThroughThread(Thread): """Wrapper around `threading.Thread` that propagates exceptions."""