Skip to content

Commit

Permalink
Asyncify APIs (#35)
Browse files Browse the repository at this point in the history
* Asyncify APIs

This ends up propagating all the way back to the main entrypoints, since
those functions do quite a lot of IO to get data from the storage
backend.

* Update azure dependencies

The latest version includes a fix to ensure a future created by the Azure SDK
is Send, suitable for use in an async_trait trait.

* Use tokio in main Python/Ruby entrypoints

* Fix datafusion test to work with async
  • Loading branch information
sd2k committed Dec 4, 2020
1 parent b2b96d1 commit 8efd96a
Show file tree
Hide file tree
Showing 17 changed files with 295 additions and 192 deletions.
19 changes: 11 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion TODO
@@ -1,4 +1,3 @@
* make all IO async
* handle commitInfo action
* use list api to find transaction logs to apply
* prefetch log content in parallel
3 changes: 3 additions & 0 deletions python/Cargo.toml
Expand Up @@ -11,6 +11,9 @@ readme = "README.md"
name = "deltalake"
crate-type = ["cdylib"]

[dependencies]
tokio = "0.2"

[dependencies.pyo3]
version = "0.12.*"
features = ["extension-module"]
Expand Down
19 changes: 15 additions & 4 deletions python/src/lib.rs
Expand Up @@ -13,19 +13,30 @@ impl PyDeltaTableError {
fn from_raw(err: deltalake::DeltaTableError) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}

fn from_tokio(err: tokio::io::Error) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}
}

#[pyclass]
struct RawDeltaTable {
_rt: tokio::runtime::Runtime,
_table: deltalake::DeltaTable,
}

#[pymethods]
impl RawDeltaTable {
#[new]
fn new(table_path: &str) -> PyResult<Self> {
let table = deltalake::open_table(&table_path).map_err(PyDeltaTableError::from_raw)?;
Ok(RawDeltaTable { _table: table })
let mut rt = tokio::runtime::Runtime::new().map_err(PyDeltaTableError::from_tokio)?;
let table = rt
.block_on(deltalake::open_table(&table_path))
.map_err(PyDeltaTableError::from_raw)?;
Ok(RawDeltaTable {
_rt: rt,
_table: table,
})
}

pub fn table_path(&self) -> PyResult<&str> {
Expand All @@ -38,8 +49,8 @@ impl RawDeltaTable {

pub fn load_version(&mut self, version: deltalake::DeltaDataTypeVersion) -> PyResult<()> {
Ok(self
._table
.load_version(version)
._rt
.block_on(self._table.load_version(version))
.map_err(PyDeltaTableError::from_raw)?)
}

Expand Down
1 change: 1 addition & 0 deletions ruby/Cargo.toml
Expand Up @@ -9,6 +9,7 @@ crate-type = ["cdylib"]

[dependencies]
helix = "*"
tokio = "0.2"

[dependencies.deltalake]
path = "../rust"
6 changes: 3 additions & 3 deletions ruby/src/lib.rs
@@ -1,4 +1,4 @@
#![recursion_limit="1024"]
#![recursion_limit = "1024"]

#[macro_use]
extern crate helix;
Expand All @@ -7,7 +7,6 @@ extern crate deltalake;
use deltalake::DeltaTable;
use std::sync::Arc;


ruby! {
class Table {
struct {
Expand All @@ -18,7 +17,8 @@ ruby! {
def initialize(helix, table_path: String) {
println!("initializing with {}", table_path);

let table = deltalake::open_table(&table_path).unwrap();
let mut rt = tokio::runtime::Runtime::new().unwrap();
let table = rt.block_on(deltalake::open_table(&table_path)).unwrap();
let actual = Arc::new(table);

Table {
Expand Down
7 changes: 4 additions & 3 deletions rust/Cargo.toml
Expand Up @@ -14,7 +14,7 @@ anyhow = "1.0"
thiserror = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = "0.2.10"
tokio = { version = "0.2.23", features = ["fs", "macros", "stream"] }
tokio-io = "0.2.0-alpha.6"
futures = "0.3.1"
bytes = "0.5.3"
Expand All @@ -23,8 +23,8 @@ regex = "1"
chrono = "0"

# Azure
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", optional = true, rev = "85e37c2a3765af88873ed9cbd7ac072e920a6127" }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust", optional = true, rev = "85e37c2a3765af88873ed9cbd7ac072e920a6127" }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", optional = true, rev = "bc74cad7cc5f9226cf39c4abae0d902a85765e74" }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust", optional = true, rev = "bc74cad7cc5f9226cf39c4abae0d902a85765e74" }

# S3
rusoto_core = { version = "0.43", optional = true }
Expand All @@ -35,6 +35,7 @@ datafusion = { git = "https://github.com/apache/arrow.git", rev = "c02ed530f989c
parquet = { git = "https://github.com/apache/arrow.git", rev = "c02ed530f989c4e0343ca6ab494c17e7796ed9c1" }
crossbeam = { version = "0", optional = true }
cfg-if = "1"
async-trait = "0.1.42"
# NOTE: disable rust-dataframe integration since it currently doesn't have a
# version published in crates.io
# rust-dataframe = {version = "0.*", optional = true }
Expand Down
9 changes: 5 additions & 4 deletions rust/src/bin/delta-inspect.rs
Expand Up @@ -3,7 +3,8 @@ extern crate deltalake;

use clap::{App, AppSettings, Arg};

fn main() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let matches = App::new("Delta table inspector")
.version(env!("CARGO_PKG_VERSION"))
.about("Utility to help inspect Delta talebs")
Expand Down Expand Up @@ -40,11 +41,11 @@ fn main() -> anyhow::Result<()> {
let table_path = files_matches.value_of("path").unwrap();

let table = match files_matches.value_of_t::<i64>("version") {
Ok(v) => deltalake::open_table_with_version(table_path, v)?,
Ok(v) => deltalake::open_table_with_version(table_path, v).await?,
Err(clap::Error {
kind: clap::ErrorKind::ArgumentNotFound,
..
}) => deltalake::open_table(table_path)?,
}) => deltalake::open_table(table_path).await?,
Err(e) => e.exit(),
};

Expand All @@ -58,7 +59,7 @@ fn main() -> anyhow::Result<()> {
}
Some(("info", info_matches)) => {
let table_path = info_matches.value_of("path").unwrap();
let table = deltalake::open_table(table_path)?;
let table = deltalake::open_table(table_path).await?;
println!("{}", table);
}
_ => unreachable!(),
Expand Down

0 comments on commit 8efd96a

Please sign in to comment.