Skip to content

Commit

Permalink
feat: Add integration tests (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Aug 1, 2023
1 parent dfa2124 commit 1feb3a9
Show file tree
Hide file tree
Showing 40 changed files with 1,227 additions and 199 deletions.
23 changes: 22 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
run: rustup update stable

- name: Format
run: cargo fmt -p icelake -- --check
run: cargo fmt -p icelake -p icelake-integration-tests -- --check

- name: Clippy
run: cargo clippy --all-targets --workspace --all-features -- -D warnings
Expand All @@ -44,3 +44,24 @@ jobs:
run: rustup update stable
- name: Test
run: cargo test --all-targets --all-features

integration-tests:
runs-on: ubuntu-latest
needs: unit
steps:
- uses: actions/checkout@v3
- name: Update Rust Stable
run: rustup update stable
- uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Setup poetry
uses: abatilo/actions-poetry@v2
with:
poetry-version: "1.5.1"
- name: Build rust
run: cargo build --all-features
- name: Run tests
run: |
cd tests/integration
bash run.sh
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ target/
Cargo.lock

.idea
**/*.iml
14 changes: 12 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
[workspace.package]
version = "0.0.9"
edition = "2021"
license = "Apache-2.0"

[workspace]
members = [
"icelake",
"tests/integration/rust",
"rest_api"
]

Expand All @@ -25,5 +30,10 @@ chrono = "0.4"
faster-hex = "0.8.0"
once_cell = "1"
tempfile = "3"


log = "0.4.0"
env_logger = "0.10.0"
csv = "1"
url = "2"
regex = "1"
clap = { version = "4", features = ["derive"]}
ordered-float = "3.7.0"
3 changes: 3 additions & 0 deletions e2e/append_data_file/catalog/iceberg.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
connector.name = iceberg
iceberg.catalog.type = REST
iceberg.rest-catalog.uri = http://iceberg-rest:8181
66 changes: 66 additions & 0 deletions e2e/append_data_file/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
version: "3"

services:
trino:
image: trinodb/trino
container_name: trino
user: root
networks:
iceberg_net:
depends_on:
- iceberg-rest
ports:
- 8080:8080
volumes:
- ./catalog:/etc/trino/catalog
iceberg-rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
user: root
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://icebergdata/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- testbucket.minio
ports:
- 9001:9001
- 9000:9000
command: [ "server", "/data", "--console-address", ":9001" ]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/icebergdata;
/usr/bin/mc mb minio/icebergdata;
/usr/bin/mc policy set public minio/icebergdata;
tail -f /dev/null
"
networks:
iceberg_net:
Empty file.
12 changes: 12 additions & 0 deletions e2e/append_data_file/setup_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE SCHEMA s1
WITH (location='s3://icebergdata/s1');

CREATE TABLE t1 (
c1 INTEGER,
c2 VARCHAR,
c3 DOUBLE
)
WITH (
format = 'PARQUET',
location = 's3://icebergdata/s1/t1/'
);
48 changes: 26 additions & 22 deletions icelake/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
[package]
name = "icelake"
version = "0.0.9"
edition = "2021"
license = "Apache-2.0"
version = { workspace = true }
edition = { workspace = true }
license = { workspace = true }
description = "Pure Rust Iceberg Implementation"

[package.metadata.docs.rs]
all-features = true

[dependencies]
anyhow = {workspace = true}
async-trait = {workspace = true}
apache-avro = {workspace = true}
arrow = {workspace = true}
bytes = {workspace = true}
futures = {workspace = true}
opendal = {workspace = true}
uuid = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
serde_with = {workspace = true}
tokio ={workspace = true}
parquet ={workspace = true}
rust_decimal ={workspace = true}
chrono ={workspace = true}
faster-hex ={workspace = true}
once_cell = {workspace = true}
ordered-float = "3.7.0"
anyhow = { workspace = true }
async-trait = { workspace = true }
arrow = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
opendal = { workspace = true }
uuid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
parquet = { workspace = true }
rust_decimal = { workspace = true }
chrono = { workspace = true }
faster-hex = { workspace = true }
once_cell = { workspace = true }
url = { workspace = true }
log = { workspace = true }
regex = { workspace = true }
ordered-float = { workspace = true }
apache-avro = { workspace = true }


[dev-dependencies]
tempfile = {workspace = true}
tempfile = { workspace = true }

[[example]]
name = "read_iceberg_table"
Expand Down
18 changes: 18 additions & 0 deletions icelake/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,24 @@ impl From<std::time::SystemTimeError> for Error {
}
}

impl From<url::ParseError> for Error {
fn from(v: url::ParseError) -> Self {
Self::new(ErrorKind::IcebergDataInvalid, "Can't parse url.").set_source(v)
}
}

impl From<regex::Error> for Error {
fn from(v: regex::Error) -> Self {
Self::new(ErrorKind::Unexpected, "Failed to parse regex").set_source(v)
}
}

impl From<std::num::ParseIntError> for Error {
fn from(v: std::num::ParseIntError) -> Self {
Self::new(ErrorKind::IcebergDataInvalid, "Failed to parse int").set_source(v)
}
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
39 changes: 25 additions & 14 deletions icelake/src/io/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use opendal::Operator;
use parquet::file::properties::{WriterProperties, WriterVersion};
use parquet::format::FileMetaData;

use super::{
Expand All @@ -20,6 +21,7 @@ use super::{
/// When complete, it will return a list of `DataFile`.
pub struct DataFileWriter {
operator: Operator,
table_location: String,
location_generator: DataFileLocationGenerator,
arrow_schema: SchemaRef,

Expand All @@ -41,13 +43,15 @@ impl DataFileWriter {
/// Create a new `DataFileWriter`.
pub async fn try_new(
operator: Operator,
table_location: String,
location_generator: DataFileLocationGenerator,
arrow_schema: SchemaRef,
rows_divisor: usize,
target_file_size_in_bytes: u64,
) -> Result<Self> {
let mut writer = Self {
operator,
table_location,
location_generator,
arrow_schema,
rows_divisor,
Expand Down Expand Up @@ -113,8 +117,14 @@ impl DataFileWriter {

let location = self.location_generator.generate_name();
let file_writer = self.operator.writer(&location).await?;
let current_writer =
ParquetWriterBuilder::new(file_writer, self.arrow_schema.clone()).build()?;
let current_writer = {
let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.build();
ParquetWriterBuilder::new(file_writer, self.arrow_schema.clone())
.with_properties(props)
.build()?
};
self.current_writer = Some(current_writer);
self.current_row_num = 0;
self.current_location = location;
Expand All @@ -125,6 +135,7 @@ impl DataFileWriter {
///
/// This function may be refactor when we support more file format.
fn convert_meta_to_datafile(&self, meta_data: FileMetaData, written_size: u64) -> DataFile {
log::info!("{meta_data:?}");
let (column_sizes, value_counts, null_value_counts, distinct_counts) = {
// how to decide column id
let mut per_col_size: HashMap<i32, _> = HashMap::new();
Expand Down Expand Up @@ -170,11 +181,11 @@ impl DataFileWriter {
};
DataFile {
content: crate::types::DataContentType::Data,
file_path: format!("{}/{}", self.operator.info().root(), self.current_location),
file_path: format!("{}/{}", &self.table_location, &self.current_location),
file_format: crate::types::DataFileFormat::Parquet,
/// # NOTE
///
/// DataFileWriter only response to write data. Partition should place by more high level writer.
// /// # NOTE
// ///
// /// DataFileWriter only response to write data. Partition should place by more high level writer.
partition: StructValue::default(),
record_count: meta_data.num_rows,
column_sizes: Some(column_sizes),
Expand All @@ -189,17 +200,16 @@ impl DataFileWriter {
/// - `file_size_in_bytes` can't get from `FileMetaData` now.
/// - `file_offset` in `FileMetaData` always be None now.
/// - `nan_value_counts` can't get from `FileMetaData` now.
split_offsets: Some(
meta_data
.row_groups
.iter()
.filter_map(|group| group.file_offset)
.collect(),
),
// Currently arrow parquet writer doesn't fill row group offsets, we can use first column chunk offset for it.
split_offsets: meta_data
.row_groups
.iter()
.filter_map(|group| group.columns.get(0).map(|c| c.file_offset))
.collect(),
nan_value_counts: None,
lower_bounds: None,
upper_bounds: None,
equality_ids: None,
equality_ids: vec![],
sort_order_id: None,
}
}
Expand Down Expand Up @@ -250,6 +260,7 @@ mod test {

let mut writer = data_file_writer::DataFileWriter::try_new(
op.clone(),
"/tmp/table".to_string(),
location_generator,
to_write.schema(),
1024,
Expand Down
2 changes: 1 addition & 1 deletion icelake/src/io/parquet/track_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl AsyncWrite for TrackWriter {
match Pin::new(&mut self.writer).poll_write(cx, buf) {
std::task::Poll::Ready(Ok(n)) => {
self.written_size
.fetch_add(n as u64, std::sync::atomic::Ordering::Relaxed);
.fetch_add(buf.len() as u64, std::sync::atomic::Ordering::SeqCst);
std::task::Poll::Ready(Ok(n))
}
std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)),
Expand Down
8 changes: 5 additions & 3 deletions icelake/src/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,18 @@ impl ParquetWriter {
///
/// This function must be called before complete the write process.
pub async fn close(self) -> Result<(FileMetaData, u64)> {
let written_size = self.get_written_size();
Ok((self.writer.close().await?, written_size))
let written_size = self.written_size.clone();
let file_metadata = self.writer.close().await?;
let written_size = written_size.load(std::sync::atomic::Ordering::SeqCst);
Ok((file_metadata, written_size))
}

/// Return the written size.
///
/// # Note
/// The size is incorrect until we call close (data could be still in buffer). It is only used as a suggestion.
pub fn get_written_size(&self) -> u64 {
self.written_size.load(std::sync::atomic::Ordering::Relaxed)
self.written_size.load(std::sync::atomic::Ordering::SeqCst)
}
}

Expand Down
3 changes: 3 additions & 0 deletions icelake/src/io/task_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl TaskWriter {
Ok(Self::Unpartitioned(
UnpartitionedWriter::try_new(
schema,
table_metadata.location.clone(),
location_generator::DataFileLocationGenerator::try_new(
&table_metadata,
partition_id,
Expand Down Expand Up @@ -108,12 +109,14 @@ impl UnpartitionedWriter {
/// Create a new `TaskWriter`.
pub async fn try_new(
schema: ArrowSchema,
table_location: String,
location_generator: DataFileLocationGenerator,
operator: Operator,
) -> Result<Self> {
Ok(Self {
data_file_writer: DataFileWriter::try_new(
operator,
table_location,
location_generator,
schema.into(),
1024,
Expand Down
Loading

0 comments on commit 1feb3a9

Please sign in to comment.