Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose peek next commit function to python #1937

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub enum DeltaTableError {
#[error("Invalid table version: {0}")]
InvalidVersion(i64),

/// Error returned when the DeltaTable has no delta log version.
#[error("Delta log not found for table version: {0}")]
DeltaLogNotFound(i64),

/// Error returned when the DeltaTable has no data files.
#[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
MissingDataFile {
Expand Down
18 changes: 18 additions & 0 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt;
use std::fmt::Formatter;
use std::{cmp::max, cmp::Ordering, collections::HashSet};

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -455,6 +456,23 @@ impl DeltaTable {
self.update().await
}

/// Get the commit obj from the version
pub async fn get_obj_from_version(
&self,
current_version: i64,
) -> Result<Bytes, DeltaTableError> {
let commit_log_bytes = match self.log_store.read_commit_entry(current_version).await {
Ok(bytes) => Ok(bytes),
Err(DeltaTableError::ObjectStore {
source: ObjectStoreError::NotFound { .. },
}) => {
return Err(DeltaTableError::DeltaLogNotFound(current_version));
}
Err(err) => Err(err),
}?;
Ok(commit_log_bytes)
}

/// Get the list of actions for the next commit
pub async fn peek_next_commit(
&self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}
{"add":{"path":"part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968586000,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1587968596254,"operation":"MERGE","operationParameters":{"predicate":"(oldData.`id` = newData.`id`)"},"readVersion":0,"isBlindAppend":false}}
{"add":{"path":"part-00004-95c9bc2c-ac85-4581-b3cc-84502b0c314f-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1587968614187,"operation":"UPDATE","operationParameters":{"predicate":"((id#697L % cast(2 as bigint)) = cast(0 as bigint))"},"readVersion":2,"isBlindAppend":false}}
{"add":{"path":"part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968614000,"dataChange":true}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RawDeltaTable:
) -> str: ...
def table_uri(self) -> str: ...
def version(self) -> int: ...
def get_obj(self, version: int) -> bytes: ...
def get_latest_version(self) -> int: ...
def metadata(self) -> RawDeltaTableMetaData: ...
def protocol_versions(self) -> List[int]: ...
Expand Down
33 changes: 32 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
from deltalake.exceptions import DeltaProtocolError
from deltalake.exceptions import DeltaError, DeltaProtocolError
from deltalake.fs import DeltaStorageHandler
from deltalake.schema import Schema as DeltaSchema

Expand Down Expand Up @@ -256,6 +256,7 @@ def __init__(

"""
self._storage_options = storage_options
self._latest_version = -1
self._table = RawDeltaTable(
str(table_uri),
version=version,
Expand Down Expand Up @@ -897,6 +898,36 @@ def update_incremental(self) -> None:
"""
self._table.update_incremental()

def get_latest_version(self) -> int:
"""
Get latest version of commit.
"""
return self._table.get_latest_version()

def peek_next_commit(
self, version: int
) -> Tuple[Optional[List[Dict[Any, Any]]], int]:
"""
Peek next commit of the input version.
"""
actions = []
next_version = version + 1
if next_version > self._latest_version:
self._latest_version = self.get_latest_version()
while next_version <= self._latest_version:
try:
commit_log_bytes = self._table.get_obj(next_version)
for commit_action in commit_log_bytes.split(b"\n"):
if commit_action:
actions.append(json.loads(commit_action))
return actions, next_version
except DeltaError as e:
if str(e) == f"Delta log not found for table version: {next_version}":
next_version += 1
else:
raise
return None, version

def create_checkpoint(self) -> None:
self._table.create_checkpoint()

Expand Down
9 changes: 8 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use deltalake::DeltaOps;
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyFrozenSet, PyType};
use pyo3::types::{PyBytes, PyFrozenSet, PyType};
use serde_json::{Map, Value};

use crate::error::DeltaProtocolError;
Expand Down Expand Up @@ -154,6 +154,13 @@ impl RawDeltaTable {
Ok(self._table.version())
}

pub fn get_obj<'py>(&self, py: Python<'py>, version: i64) -> PyResult<&'py PyBytes> {
let commit_log_bytes = rt()?
.block_on(self._table.get_obj_from_version(version))
.map_err(PythonError::from)?;
return Ok(PyBytes::new(py, &commit_log_bytes));
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
let metadata = self._table.metadata().map_err(PythonError::from)?;
Ok(RawDeltaTableMetaData {
Expand Down
23 changes: 23 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,29 @@ def test_writer_fails_on_protocol():
dt.to_pandas()


@pytest.mark.parametrize("version, expected", [(2, (5, 3))])
def test_peek_next_commit(version, expected):
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
actions, next_version = dt.peek_next_commit(version=version)
assert (len(actions), next_version) == expected


def test_delta_log_not_found():
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
latest_version = dt.get_latest_version()
_, version = dt.peek_next_commit(version=latest_version)
assert version == latest_version


def test_delta_log_missed():
table_path = "../crates/deltalake-core/tests/data/simple_table_missing_commit"
dt = DeltaTable(table_path)
_, version = dt.peek_next_commit(version=1)
assert version == 3 # Missed commit version 2, should return version 3


class ExcPassThroughThread(Thread):
"""Wrapper around `threading.Thread` that propagates exceptions."""

Expand Down