Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve C Data Interface and Add Integration Testing Entrypoints #5080

Merged
merged 10 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion arrow-data/src/ffi.rs
Expand Up @@ -168,6 +168,12 @@ impl FFI_ArrowArray {
.collect::<Box<_>>();
let n_children = children.len() as i64;

// As in the IPC format, emit null_count = length for Null type
let null_count = match data.data_type() {
DataType::Null => data.len(),
_ => data.null_count(),
};

// create the private data owning everything.
// any other data must be added here, e.g. via a struct, to track lifetime.
let mut private_data = Box::new(ArrayPrivateData {
Expand All @@ -179,7 +185,7 @@ impl FFI_ArrowArray {

Self {
length: data.len() as i64,
null_count: data.null_count() as i64,
null_count: null_count as i64,
offset: data.offset() as i64,
n_buffers,
n_children,
Expand Down
5 changes: 4 additions & 1 deletion arrow-integration-testing/Cargo.toml
Expand Up @@ -27,11 +27,14 @@ edition = { workspace = true }
publish = false
rust-version = { workspace = true }

[lib]
crate-type = ["lib", "cdylib"]

[features]
logging = ["tracing-subscriber"]

[dependencies]
arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json"] }
arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json", "ffi"] }
arrow-flight = { path = "../arrow-flight", default-features = false }
arrow-buffer = { path = "../arrow-buffer", default-features = false }
arrow-integration-test = { path = "../arrow-integration-test", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion arrow-integration-testing/README.md
Expand Up @@ -48,7 +48,7 @@ ln -s <path_to_arrow_rs> arrow/rust

```shell
cd arrow
pip install -e dev/archery[docker]
pip install -e dev/archery[integration]
```

### Build the C++ binaries:
Expand Down
49 changes: 5 additions & 44 deletions arrow-integration-testing/src/bin/arrow-json-integration-test.rs
Expand Up @@ -15,16 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field};
use arrow::datatypes::{Fields, Schema};
use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use arrow_integration_test::*;
use arrow_integration_testing::read_json_file;
use arrow_integration_testing::{canonicalize_schema, open_json_file};
use clap::Parser;
use std::fs::File;
use std::sync::Arc;

#[derive(clap::ValueEnum, Debug, Clone)]
#[clap(rename_all = "SCREAMING_SNAKE_CASE")]
Expand Down Expand Up @@ -66,12 +63,12 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>
eprintln!("Converting {json_name} to {arrow_name}");
}

let json_file = read_json_file(json_name)?;
let json_file = open_json_file(json_name)?;

let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;

for b in json_file.batches {
for b in json_file.read_batches()? {
writer.write(&b)?;
}

Expand Down Expand Up @@ -113,49 +110,13 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>
Ok(())
}

fn canonicalize_schema(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Map(child_field, sorted) => match child_field.data_type() {
DataType::Struct(fields) if fields.len() == 2 => {
let first_field = fields.get(0).unwrap();
let key_field =
Arc::new(Field::new("key", first_field.data_type().clone(), false));
let second_field = fields.get(1).unwrap();
let value_field = Arc::new(Field::new(
"value",
second_field.data_type().clone(),
second_field.is_nullable(),
));

let fields = Fields::from([key_field, value_field]);
let struct_type = DataType::Struct(fields);
let child_field = Field::new("entries", struct_type, false);

Arc::new(Field::new(
field.name().as_str(),
DataType::Map(Arc::new(child_field), *sorted),
field.is_nullable(),
))
}
_ => panic!("The child field of Map type should be Struct type with 2 fields."),
},
_ => field.clone(),
})
.collect::<Fields>();

Schema::new(fields).with_metadata(schema.metadata().clone())
}

fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
if verbose {
eprintln!("Validating {arrow_name} and {json_name}");
}

// open JSON file
let json_file = read_json_file(json_name)?;
let json_file = open_json_file(json_name)?;

// open Arrow file
let arrow_file = File::open(arrow_name)?;
Expand All @@ -170,7 +131,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
)));
}

let json_batches = &json_file.batches;
let json_batches = json_file.read_batches()?;

// compare number of batches
assert!(
Expand Down
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{read_json_file, ArrowFile};
use crate::open_json_file;
use std::collections::HashMap;

use arrow::{
Expand Down Expand Up @@ -45,23 +45,16 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {

let client = FlightServiceClient::connect(url).await?;

let ArrowFile {
schema, batches, ..
} = read_json_file(path)?;
let json_file = open_json_file(path)?;

let schema = Arc::new(schema);
let batches = json_file.read_batches()?;
let schema = Arc::new(json_file.schema);

let mut descriptor = FlightDescriptor::default();
descriptor.set_type(DescriptorType::Path);
descriptor.path = vec![path.to_string()];

upload_data(
client.clone(),
schema.clone(),
descriptor.clone(),
batches.clone(),
)
.await?;
upload_data(client.clone(), schema, descriptor.clone(), batches.clone()).await?;
verify_data(client, descriptor, &batches).await?;

Ok(())
Expand Down