diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00c702e..e795ba3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: run: rustup update stable - name: Format - run: cargo fmt -p icelake -- --check + run: cargo fmt -p icelake -p icelake-integration-tests -- --check - name: Clippy run: cargo clippy --all-targets --workspace --all-features -- -D warnings @@ -44,3 +44,24 @@ jobs: run: rustup update stable - name: Test run: cargo test --all-targets --all-features + + integration-tests: + runs-on: ubuntu-latest + needs: unit + steps: + - uses: actions/checkout@v3 + - name: Update Rust Stable + run: rustup update stable + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Setup poetry + uses: abatilo/actions-poetry@v2 + with: + poetry-version: "1.5.1" + - name: Build rust + run: cargo build --all-features + - name: Run tests + run: | + cd tests/integration + bash run.sh diff --git a/.gitignore b/.gitignore index e5c57f9..97a0d42 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ target/ Cargo.lock .idea +**/*.iml \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 644f92b..de82019 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,12 @@ +[workspace.package] +version = "0.0.9" +edition = "2021" +license = "Apache-2.0" [workspace] members = [ "icelake", + "tests/integration/rust", "rest_api" ] @@ -25,5 +30,10 @@ chrono = "0.4" faster-hex = "0.8.0" once_cell = "1" tempfile = "3" - - +log = "0.4.0" +env_logger = "0.10.0" +csv = "1" +url = "2" +regex = "1" +clap = { version = "4", features = ["derive"]} +ordered-float = "3.7.0" diff --git a/e2e/append_data_file/catalog/iceberg.properties b/e2e/append_data_file/catalog/iceberg.properties new file mode 100644 index 0000000..7610fd4 --- /dev/null +++ b/e2e/append_data_file/catalog/iceberg.properties @@ -0,0 +1,3 @@ +connector.name = iceberg +iceberg.catalog.type = REST +iceberg.rest-catalog.uri = http://iceberg-rest:8181 diff --git a/e2e/append_data_file/docker-compose.yml b/e2e/append_data_file/docker-compose.yml new file mode 100644 index 0000000..385f4bd --- /dev/null +++ b/e2e/append_data_file/docker-compose.yml @@ -0,0 +1,66 @@ +version: "3" + +services: + trino: + image: trinodb/trino + container_name: trino + user: root + networks: + iceberg_net: + depends_on: + - iceberg-rest + ports: + - 8080:8080 + volumes: + - ./catalog:/etc/trino/catalog + iceberg-rest: + image: tabulario/iceberg-rest + container_name: iceberg-rest + user: root + networks: + iceberg_net: + ports: + - 8181:8181 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3://icebergdata/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + minio: + image: minio/minio + container_name: minio + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + networks: + iceberg_net: + aliases: + - testbucket.minio + ports: + - 9001:9001 + - 9000:9000 + command: [ "server", "/data", "--console-address", ":9001" ] + mc: + depends_on: + - minio + image: minio/mc + container_name: mc + networks: + iceberg_net: + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/icebergdata; + /usr/bin/mc mb minio/icebergdata; + /usr/bin/mc policy set public minio/icebergdata; + tail -f /dev/null + " +networks: + iceberg_net: diff --git a/e2e/append_data_file/setup_table.output b/e2e/append_data_file/setup_table.output new file mode 100644 index 0000000..e69de29 diff --git a/e2e/append_data_file/setup_table.sql b/e2e/append_data_file/setup_table.sql new file mode 100644 index 0000000..d1eed3d --- /dev/null +++ b/e2e/append_data_file/setup_table.sql @@ -0,0 +1,12 @@ +CREATE SCHEMA s1 +WITH (location='s3://icebergdata/s1'); + +CREATE TABLE t1 ( + c1 INTEGER, + c2 VARCHAR, + c3 DOUBLE +) +WITH ( + format = 'PARQUET', + location = 's3://icebergdata/s1/t1/' +); \ No newline at end of file diff --git a/icelake/Cargo.toml b/icelake/Cargo.toml index c1a11dc..e5277d0 100644 --- a/icelake/Cargo.toml +++ b/icelake/Cargo.toml @@ -1,35 +1,39 @@ [package] name = "icelake" -version = "0.0.9" -edition = "2021" -license = "Apache-2.0" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } description = "Pure Rust Iceberg Implementation" [package.metadata.docs.rs] all-features = true [dependencies] -anyhow = {workspace = true} -async-trait = {workspace = true} -apache-avro = {workspace = true} -arrow = {workspace = true} -bytes = {workspace = true} -futures = {workspace = true} -opendal = {workspace = true} -uuid = {workspace = true} -serde = {workspace = true} -serde_json = {workspace = true} -serde_with = {workspace = true} -tokio ={workspace = true} -parquet ={workspace = true} -rust_decimal ={workspace = true} -chrono ={workspace = true} -faster-hex ={workspace = true} -once_cell = {workspace = true} -ordered-float = "3.7.0" +anyhow = { workspace = true } +async-trait = { workspace = true } +arrow = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +opendal = { workspace = true } +uuid = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_with = { workspace = true } +tokio = { workspace = true } +parquet = { workspace = true } +rust_decimal = { workspace = true } +chrono = { workspace = true } +faster-hex = { workspace = true } +once_cell = { workspace = true } +url = { workspace = true } +log = { workspace = true } +regex = { workspace = true } +ordered-float = { workspace = true } +apache-avro = { workspace = true } + [dev-dependencies] -tempfile = {workspace = true} +tempfile = { workspace = true } [[example]] name = "read_iceberg_table" diff --git a/icelake/src/error.rs b/icelake/src/error.rs index 8d64eb9..02cf143 100644 --- a/icelake/src/error.rs +++ b/icelake/src/error.rs @@ -203,6 +203,24 @@ impl From for Error { } } +impl From for Error { + fn from(v: url::ParseError) -> Self { + Self::new(ErrorKind::IcebergDataInvalid, "Can't parse url.").set_source(v) + } +} + +impl From for Error { + fn from(v: regex::Error) -> Self { + Self::new(ErrorKind::Unexpected, "Failed to parse regex").set_source(v) + } +} + +impl From for Error { + fn from(v: std::num::ParseIntError) -> Self { + Self::new(ErrorKind::IcebergDataInvalid, "Failed to parse int").set_source(v) + } +} + #[cfg(test)] mod tests { use anyhow::anyhow; diff --git a/icelake/src/io/data_file_writer.rs b/icelake/src/io/data_file_writer.rs index 42ba8aa..82697fe 100644 --- a/icelake/src/io/data_file_writer.rs +++ b/icelake/src/io/data_file_writer.rs @@ -9,6 +9,7 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use opendal::Operator; +use parquet::file::properties::{WriterProperties, WriterVersion}; use parquet::format::FileMetaData; use super::{ @@ -20,6 +21,7 @@ use super::{ /// When complete, it will return a list of `DataFile`. pub struct DataFileWriter { operator: Operator, + table_location: String, location_generator: DataFileLocationGenerator, arrow_schema: SchemaRef, @@ -41,6 +43,7 @@ impl DataFileWriter { /// Create a new `DataFileWriter`. pub async fn try_new( operator: Operator, + table_location: String, location_generator: DataFileLocationGenerator, arrow_schema: SchemaRef, rows_divisor: usize, @@ -48,6 +51,7 @@ impl DataFileWriter { ) -> Result { let mut writer = Self { operator, + table_location, location_generator, arrow_schema, rows_divisor, @@ -113,8 +117,14 @@ impl DataFileWriter { let location = self.location_generator.generate_name(); let file_writer = self.operator.writer(&location).await?; - let current_writer = - ParquetWriterBuilder::new(file_writer, self.arrow_schema.clone()).build()?; + let current_writer = { + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .build(); + ParquetWriterBuilder::new(file_writer, self.arrow_schema.clone()) + .with_properties(props) + .build()? + }; self.current_writer = Some(current_writer); self.current_row_num = 0; self.current_location = location; @@ -125,6 +135,7 @@ impl DataFileWriter { /// /// This function may be refactor when we support more file format. fn convert_meta_to_datafile(&self, meta_data: FileMetaData, written_size: u64) -> DataFile { + log::info!("{meta_data:?}"); let (column_sizes, value_counts, null_value_counts, distinct_counts) = { // how to decide column id let mut per_col_size: HashMap = HashMap::new(); @@ -170,11 +181,11 @@ impl DataFileWriter { }; DataFile { content: crate::types::DataContentType::Data, - file_path: format!("{}/{}", self.operator.info().root(), self.current_location), + file_path: format!("{}/{}", &self.table_location, &self.current_location), file_format: crate::types::DataFileFormat::Parquet, - /// # NOTE - /// - /// DataFileWriter only response to write data. Partition should place by more high level writer. + // /// # NOTE + // /// + // /// DataFileWriter only response to write data. Partition should place by more high level writer. partition: StructValue::default(), record_count: meta_data.num_rows, column_sizes: Some(column_sizes), @@ -189,17 +200,16 @@ impl DataFileWriter { /// - `file_size_in_bytes` can't get from `FileMetaData` now. /// - `file_offset` in `FileMetaData` always be None now. /// - `nan_value_counts` can't get from `FileMetaData` now. - split_offsets: Some( - meta_data - .row_groups - .iter() - .filter_map(|group| group.file_offset) - .collect(), - ), + // Currently arrow parquet writer doesn't fill row group offsets, we can use first column chunk offset for it. + split_offsets: meta_data + .row_groups + .iter() + .filter_map(|group| group.columns.get(0).map(|c| c.file_offset)) + .collect(), nan_value_counts: None, lower_bounds: None, upper_bounds: None, - equality_ids: None, + equality_ids: vec![], sort_order_id: None, } } @@ -250,6 +260,7 @@ mod test { let mut writer = data_file_writer::DataFileWriter::try_new( op.clone(), + "/tmp/table".to_string(), location_generator, to_write.schema(), 1024, diff --git a/icelake/src/io/parquet/track_writer.rs b/icelake/src/io/parquet/track_writer.rs index 25b87e6..b6c8be7 100644 --- a/icelake/src/io/parquet/track_writer.rs +++ b/icelake/src/io/parquet/track_writer.rs @@ -35,7 +35,7 @@ impl AsyncWrite for TrackWriter { match Pin::new(&mut self.writer).poll_write(cx, buf) { std::task::Poll::Ready(Ok(n)) => { self.written_size - .fetch_add(n as u64, std::sync::atomic::Ordering::Relaxed); + .fetch_add(buf.len() as u64, std::sync::atomic::Ordering::SeqCst); std::task::Poll::Ready(Ok(n)) } std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), diff --git a/icelake/src/io/parquet/write.rs b/icelake/src/io/parquet/write.rs index 7053671..0f84b6d 100644 --- a/icelake/src/io/parquet/write.rs +++ b/icelake/src/io/parquet/write.rs @@ -93,8 +93,10 @@ impl ParquetWriter { /// /// This function must be called before complete the write process. pub async fn close(self) -> Result<(FileMetaData, u64)> { - let written_size = self.get_written_size(); - Ok((self.writer.close().await?, written_size)) + let written_size = self.written_size.clone(); + let file_metadata = self.writer.close().await?; + let written_size = written_size.load(std::sync::atomic::Ordering::SeqCst); + Ok((file_metadata, written_size)) } /// Return the written size. @@ -102,7 +104,7 @@ impl ParquetWriter { /// # Note /// The size is incorrect until we call close (data could be still in buffer). It is only used as a suggestion. pub fn get_written_size(&self) -> u64 { - self.written_size.load(std::sync::atomic::Ordering::Relaxed) + self.written_size.load(std::sync::atomic::Ordering::SeqCst) } } diff --git a/icelake/src/io/task_writer.rs b/icelake/src/io/task_writer.rs index dcd2dab..7642693 100644 --- a/icelake/src/io/task_writer.rs +++ b/icelake/src/io/task_writer.rs @@ -66,6 +66,7 @@ impl TaskWriter { Ok(Self::Unpartitioned( UnpartitionedWriter::try_new( schema, + table_metadata.location.clone(), location_generator::DataFileLocationGenerator::try_new( &table_metadata, partition_id, @@ -108,12 +109,14 @@ impl UnpartitionedWriter { /// Create a new `TaskWriter`. pub async fn try_new( schema: ArrowSchema, + table_location: String, location_generator: DataFileLocationGenerator, operator: Operator, ) -> Result { Ok(Self { data_file_writer: DataFileWriter::try_new( operator, + table_location, location_generator, schema.into(), 1024, diff --git a/icelake/src/table.rs b/icelake/src/table.rs index 226fed7..1031045 100644 --- a/icelake/src/table.rs +++ b/icelake/src/table.rs @@ -6,15 +6,18 @@ use futures::StreamExt; use opendal::layers::LoggingLayer; use opendal::services::Fs; use opendal::Operator; +use regex::Regex; +use url::Url; use uuid::Uuid; use crate::io::task_writer::TaskWriter; use crate::types::{serialize_table_meta, DataFile, TableMetadata}; -use crate::{types, Error}; +use crate::{types, Error, ErrorKind}; const META_ROOT_PATH: &str = "metadata"; const METADATA_FILE_EXTENSION: &str = ".metadata.json"; const VERSION_HINT_FILENAME: &str = "version-hint.text"; +const VERSIONED_TABLE_METADATA_FILE_PATTERN: &str = r"v([0-9]+).metadata.json"; /// Table is the main entry point for the IceLake. pub struct Table { @@ -27,6 +30,8 @@ pub struct Table { /// We use table's `last-updated-ms` to represent the version. current_version: i64, current_location: Option, + /// It's different from `current_version` in that it's the `v[version number]` in metadata file. + current_table_version: i64, task_id: AtomicUsize, } @@ -42,21 +47,48 @@ impl Table { current_version: 0, current_location: None, task_id: AtomicUsize::new(0), + current_table_version: 0, } } /// Load metadata and manifest from storage. async fn load(&mut self) -> Result<()> { - let path = if self.is_version_hint_exist().await? { + let (cur_table_version, path) = if self.is_version_hint_exist().await? { let version_hint = self.read_version_hint().await?; - format!("metadata/v{}.metadata.json", version_hint) + ( + version_hint, + format!("metadata/v{}.metadata.json", version_hint), + ) } else { let files = self.list_table_metadata_paths().await?; - files.into_iter().last().ok_or(Error::new( + let path = files.into_iter().last().ok_or(Error::new( crate::ErrorKind::IcebergDataInvalid, "no table metadata found", - ))? + ))?; + + let version_hint = { + let re = Regex::new(VERSIONED_TABLE_METADATA_FILE_PATTERN)?; + if re.is_match(path.as_str()) { + let (_, [version]) = re + .captures_iter(path.as_str()) + .map(|c| c.extract()) + .next() + .ok_or_else(|| { + Error::new( + crate::ErrorKind::IcebergDataInvalid, + format!("Invalid metadata file name {path}"), + ) + })?; + version.parse()? + } else { + // This is an ugly workaround to fix ut + log::error!("Hadoop table metadata filename doesn't not match pattern!"); + 0 + } + }; + + (version_hint, path) }; let metadata = self.read_table_metadata(&path).await?; @@ -71,6 +103,7 @@ impl Table { self.current_location = Some(metadata.location.clone()); self.table_metadata .insert(metadata.last_updated_ms, metadata); + self.current_table_version = cur_table_version as i64; Ok(()) } @@ -278,16 +311,73 @@ impl Table { Table::metadata_path(format!("v{metadata_version}{METADATA_FILE_EXTENSION}")) } + /// Returns absolute path in operator. + pub fn absolution_path(op: &Operator, relation_location: &str) -> String { + let op_info = op.info(); + format!( + "{}://{}/{}/{}", + op_info.scheme().into_static(), + op_info.name(), + op_info.root(), + relation_location + ) + } + + async fn rename(op: &Operator, src_path: &str, dest_path: &str) -> Result<()> { + let info = op.info(); + if info.can_rename() { + Ok(op.rename(src_path, dest_path).await?) + } else { + op.copy(src_path, dest_path).await?; + op.delete(src_path).await?; + Ok(()) + } + } + + /// Returns the relative path to operator. + pub fn relative_path(op: &Operator, absolute_path: &str) -> Result { + let url = Url::parse(absolute_path)?; + let op_info = op.info(); + + // TODO: We should check schema here, but how to guarantee schema compatible such as s3, s3a + + if url.host_str() != Some(op_info.name()) { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Host in {:?} not match with operator info {}", + url.host_str(), + op_info.name() + ), + )); + } + + url.path() + .strip_prefix(op_info.root()) + .ok_or_else(|| { + Error::new( + crate::ErrorKind::IcebergDataInvalid, + format!( + "path {} is not starts with operator root {}", + absolute_path, + op_info.root() + ), + ) + }) + .map(|s| s.to_string()) + } + pub(crate) fn operator(&self) -> Operator { self.op.clone() } pub(crate) async fn commit(&mut self, next_metadata: TableMetadata) -> Result<()> { - let next_version = self.current_version + 1; + let next_version = self.current_table_version + 1; let tmp_metadata_file_path = Table::metadata_path(format!("{}{METADATA_FILE_EXTENSION}", Uuid::new_v4())); let final_metadata_file_path = Table::metadata_file_path(next_version); + log::debug!("Writing to temporary metadata file path: {tmp_metadata_file_path}"); self.op .write( &tmp_metadata_file_path, @@ -295,9 +385,8 @@ impl Table { ) .await?; - self.op - .rename(&tmp_metadata_file_path, &final_metadata_file_path) - .await?; + log::debug!("Renaming temporary metadata file path [{tmp_metadata_file_path}] to final metadata file path [{final_metadata_file_path}]"); + Table::rename(&self.op, &tmp_metadata_file_path, &final_metadata_file_path).await?; self.write_metadata_version_hint(next_version).await?; // Reload table @@ -306,11 +395,17 @@ impl Table { } async fn write_metadata_version_hint(&self, version: i64) -> Result<()> { - let path = Table::metadata_path(format!("{}-version-hint.temp", Uuid::new_v4())); - self.op.write(&path, format!("{version}")).await?; + let tmp_version_hint_path = + Table::metadata_path(format!("{}-version-hint.temp", Uuid::new_v4())); + self.op + .write(&tmp_version_hint_path, format!("{version}")) + .await?; + + let final_version_hint_path = Table::metadata_path(VERSION_HINT_FILENAME); - self.op.delete(VERSION_HINT_FILENAME).await?; - self.op.rename(&path, VERSION_HINT_FILENAME).await?; + self.op.delete(final_version_hint_path.as_str()).await?; + log::debug!("Renaming temporary version hint file path [{tmp_version_hint_path}] to final metadata file path [{final_version_hint_path}]"); + Table::rename(&self.op, &tmp_version_hint_path, &final_version_hint_path).await?; Ok(()) } diff --git a/icelake/src/transaction.rs b/icelake/src/transaction.rs index 1fceca8..e3e48db 100644 --- a/icelake/src/transaction.rs +++ b/icelake/src/transaction.rs @@ -43,9 +43,9 @@ impl<'a> Transaction<'a> { } /// Append a new data file. - pub fn append_file(mut self, data_file: DataFile) -> Self { - self.ops.push(Operation::AppendDataFile(data_file)); - self + pub fn append_file(&mut self, data_file: impl IntoIterator) { + self.ops + .extend(data_file.into_iter().map(Operation::AppendDataFile)); } /// Commit this transaction. @@ -84,8 +84,11 @@ impl<'a> Transaction<'a> { fn manifest_list_path(ctx: &mut CommitContext, snapshot_id: i64) -> String { ctx.attempt += 1; Table::metadata_path(format!( - "snap-{}-{}-{}", - snapshot_id, ctx.attempt, &ctx.uuid + "snap-{}-{}-{}.{}", + snapshot_id, + ctx.attempt, + &ctx.uuid, + DataFileFormat::Avro.to_string() )) } @@ -121,8 +124,10 @@ impl<'a> Transaction<'a> { let writer = ManifestWriter::new( cur_metadata.current_partition_spec()?.clone(), table.operator(), + cur_metadata.location.as_str(), Transaction::next_manifest_path(&mut ctx), next_snapshot_id, + next_seq_number, ); let manifest_file = ManifestFile { metadata: ManifestMetadata { @@ -155,7 +160,8 @@ impl<'a> Transaction<'a> { .write(manifest_list) .await?; - manifest_list_path + // Absolute path stored in snapshot file + format!("{}/{manifest_list_path}", cur_metadata.location) }; let cur_snapshot = cur_metadata.current_snapshot()?; diff --git a/icelake/src/types/in_memory.rs b/icelake/src/types/in_memory.rs index beeb925..5d15eaf 100644 --- a/icelake/src/types/in_memory.rs +++ b/icelake/src/types/in_memory.rs @@ -18,11 +18,12 @@ use std::hash::Hash; use uuid::Uuid; use crate::types::parse_manifest_list; -use crate::Error; use crate::ErrorKind; use crate::Result; +use crate::{Error, Table}; pub(crate) const UNASSIGNED_SEQ_NUM: i64 = -1; +const MAIN_BRANCH: &str = "main"; /// All data types are either primitives or nested types, which are maps, lists, or structs. #[derive(Debug, PartialEq, Clone)] @@ -821,17 +822,17 @@ pub struct ManifestListEntry { /// /// Number of entries in the manifest that have status ADDED, when null /// this is assumed to be non-zero - pub added_files_count: i32, + pub added_data_files_count: i32, /// field: 505 /// /// Number of entries in the manifest that have status EXISTING (0), /// when null this is assumed to be non-zero - pub existing_files_count: i32, + pub existing_data_files_count: i32, /// field: 506 /// /// Number of entries in the manifest that have status DELETED (2), /// when null this is assumed to be non-zero - pub deleted_files_count: i32, + pub deleted_data_files_count: i32, /// field: 512 /// /// Number of rows in all of files in the manifest that have status @@ -853,7 +854,7 @@ pub struct ManifestListEntry { /// A list of field summaries for each partition field in the spec. Each /// field in the list corresponds to a field in the manifest file’s /// partition spec. - pub partitions: Option>, + pub partitions: Vec, /// field: 519 /// /// Implementation-specific key metadata for encryption @@ -864,31 +865,46 @@ mod manifest_list { use super::*; use once_cell::sync::Lazy; pub static MANIFEST_PATH: Lazy = - Lazy::new(|| Field::required(0, "manifest_path", Any::Primitive(Primitive::String))); + Lazy::new(|| Field::required(500, "manifest_path", Any::Primitive(Primitive::String))); pub static MANIFEST_LENGTH: Lazy = - Lazy::new(|| Field::required(1, "manifest_length", Any::Primitive(Primitive::Long))); + Lazy::new(|| Field::required(501, "manifest_length", Any::Primitive(Primitive::Long))); pub static PARTITION_SPEC_ID: Lazy = - Lazy::new(|| Field::required(2, "partition_spec_id", Any::Primitive(Primitive::Int))); + Lazy::new(|| Field::required(502, "partition_spec_id", Any::Primitive(Primitive::Int))); pub static CONTENT: Lazy = - Lazy::new(|| Field::required(3, "content", Any::Primitive(Primitive::Int))); + Lazy::new(|| Field::required(517, "content", Any::Primitive(Primitive::Int))); pub static SEQUENCE_NUMBER: Lazy = - Lazy::new(|| Field::required(4, "sequence_number", Any::Primitive(Primitive::Long))); + Lazy::new(|| Field::required(515, "sequence_number", Any::Primitive(Primitive::Long))); pub static MIN_SEQUENCE_NUMBER: Lazy = - Lazy::new(|| Field::required(5, "min_sequence_number", Any::Primitive(Primitive::Long))); + Lazy::new(|| Field::required(516, "min_sequence_number", Any::Primitive(Primitive::Long))); pub static ADDED_SNAPSHOT_ID: Lazy = - Lazy::new(|| Field::required(6, "added_snapshot_id", Any::Primitive(Primitive::Long))); - pub static ADDED_FILES_COUNT: Lazy = - Lazy::new(|| Field::required(7, "added_files_count", Any::Primitive(Primitive::Int))); - pub static EXISTING_FILES_COUNT: Lazy = - Lazy::new(|| Field::required(8, "existing_files_count", Any::Primitive(Primitive::Int))); - pub static DELETED_FILES_COUNT: Lazy = - Lazy::new(|| Field::required(9, "deleted_files_count", Any::Primitive(Primitive::Int))); + Lazy::new(|| Field::required(503, "added_snapshot_id", Any::Primitive(Primitive::Long))); + pub static ADDED_FILES_COUNT: Lazy = Lazy::new(|| { + Field::required( + 504, + "added_data_files_count", + Any::Primitive(Primitive::Int), + ) + }); + pub static EXISTING_FILES_COUNT: Lazy = Lazy::new(|| { + Field::required( + 505, + "existing_data_files_count", + Any::Primitive(Primitive::Int), + ) + }); + pub static DELETED_FILES_COUNT: Lazy = Lazy::new(|| { + Field::required( + 506, + "deleted_data_files_count", + Any::Primitive(Primitive::Int), + ) + }); pub static ADDED_ROWS_COUNT: Lazy = - Lazy::new(|| Field::required(10, "added_rows_count", Any::Primitive(Primitive::Long))); + Lazy::new(|| Field::required(512, "added_rows_count", Any::Primitive(Primitive::Long))); pub static EXISTING_ROWS_COUNT: Lazy = - Lazy::new(|| Field::required(11, "existing_rows_count", Any::Primitive(Primitive::Long))); + Lazy::new(|| Field::required(513, "existing_rows_count", Any::Primitive(Primitive::Long))); pub static DELETED_ROWS_COUNT: Lazy = - Lazy::new(|| Field::required(12, "deleted_rows_count", Any::Primitive(Primitive::Long))); + Lazy::new(|| Field::required(514, "deleted_rows_count", Any::Primitive(Primitive::Long))); pub static PARTITIONS: Lazy = Lazy::new(|| { Field::optional( 13, @@ -908,7 +924,7 @@ mod manifest_list { ) }); pub static KEY_METADATA: Lazy = - Lazy::new(|| Field::optional(14, "key_metadata", Any::Primitive(Primitive::Binary))); + Lazy::new(|| Field::optional(519, "key_metadata", Any::Primitive(Primitive::Binary))); } /// Field summary for partition field in the spec. @@ -1230,7 +1246,7 @@ pub struct DataFile { /// /// Split offsets for the data file. For example, all row group offsets /// in a Parquet file. Must be sorted ascending - pub split_offsets: Option>, + pub split_offsets: Vec, /// field id: 135 /// element field id: 136 /// @@ -1238,7 +1254,7 @@ pub struct DataFile { /// Required when content is EqualityDeletes and should be null /// otherwise. Fields with ids listed in this column must be present /// in the delete file - pub equality_ids: Option>, + pub equality_ids: Vec, /// field id: 140 /// /// ID representing sort order for this file. @@ -1252,12 +1268,12 @@ pub struct DataFile { pub sort_order_id: Option, } -impl DataFile { - /// Set the partition for this data file. - pub fn set_partition(&mut self, partition: StructValue) { - self.partition = partition; - } -} +// impl DataFile { +// /// Set the partition for this data file. +// pub fn set_partition(&mut self, partition: StructValue) { +// self.partition = partition; +// } +// } mod datafile { use super::*; @@ -1423,7 +1439,7 @@ impl DataFile { content, file_path: file_path.into(), file_format, - // TODO: Should not use default partition here. Replace it after introduce deserialize of `StructValue`. + // // TODO: Should not use default partition here. Replace it after introduce deserialize of `StructValue`. partition: StructValue::default(), record_count, file_size_in_bytes, @@ -1435,8 +1451,8 @@ impl DataFile { lower_bounds: None, upper_bounds: None, key_metadata: None, - split_offsets: None, - equality_ids: None, + split_offsets: vec![], + equality_ids: vec![], sort_order_id: None, } } @@ -1562,7 +1578,10 @@ pub struct Snapshot { impl Snapshot { pub(crate) async fn load_manifest_list(&self, op: &Operator) -> Result { - parse_manifest_list(&op.read(self.manifest_list.as_str()).await?) + parse_manifest_list( + &op.read(Table::relative_path(op, self.manifest_list.as_str())?.as_str()) + .await?, + ) } pub(crate) fn log(&self) -> SnapshotLog { @@ -1623,6 +1642,18 @@ pub struct SnapshotReference { pub max_ref_age_ms: Option, } +impl SnapshotReference { + pub(crate) fn new(snapshot_id: i64, typ: SnapshotReferenceType) -> Self { + Self { + snapshot_id, + typ, + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + } + } +} + /// Type of the reference #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum SnapshotReferenceType { @@ -1757,7 +1788,7 @@ pub struct TableMetadata { /// /// There is always a main branch reference pointing to the /// `current-snapshot-id` even if the refs map is null. - pub refs: Option>, + pub refs: HashMap, } impl TableMetadata { @@ -1808,6 +1839,20 @@ impl TableMetadata { } pub(crate) fn append_snapshot(&mut self, snapshot: Snapshot) -> Result<()> { + self.last_updated_ms = snapshot.timestamp_ms; + self.last_sequence_number = snapshot.sequence_number; + self.current_snapshot_id = Some(snapshot.snapshot_id); + + self.refs + .entry(MAIN_BRANCH.to_string()) + .and_modify(|s| { + s.snapshot_id = snapshot.snapshot_id; + s.typ = SnapshotReferenceType::Branch; + }) + .or_insert_with(|| { + SnapshotReference::new(snapshot.snapshot_id, SnapshotReferenceType::Branch) + }); + if let Some(snapshots) = &mut self.snapshots { self.snapshot_log .as_mut() @@ -1819,7 +1864,6 @@ impl TableMetadata { })? .push(snapshot.log()); snapshots.push(snapshot); - Ok(()) } else { if self.snapshot_log.is_some() { return Err(Error::new( @@ -1830,8 +1874,9 @@ impl TableMetadata { self.snapshot_log = Some(vec![snapshot.log()]); self.snapshots = Some(vec![snapshot]); - Ok(()) } + + Ok(()) } } diff --git a/icelake/src/types/on_disk/manifest_file.rs b/icelake/src/types/on_disk/manifest_file.rs index 1af6243..eefd91f 100644 --- a/icelake/src/types/on_disk/manifest_file.rs +++ b/icelake/src/types/on_disk/manifest_file.rs @@ -12,6 +12,7 @@ use serde_with::Bytes; use super::parse_schema; use crate::types::on_disk::partition_spec::serialize_partition_spec_fields; use crate::types::on_disk::schema::serialize_schema; +use crate::types::to_avro::to_avro_schema; use crate::types::{self, StructValue}; use crate::types::{DataContentType, ManifestContentType, ManifestListEntry, UNASSIGNED_SEQ_NUM}; use crate::types::{ManifestStatus, TableFormatVersion}; @@ -159,8 +160,9 @@ struct DataFile { upper_bounds: Option>, #[serde_as(as = "Option")] key_metadata: Option>, - split_offsets: Option>, - equality_ids: Option>, + split_offsets: Vec, + #[serde(default)] + equality_ids: Vec, sort_order_id: Option, } @@ -275,6 +277,7 @@ fn parse_data_file_format(s: &str) -> Result { pub(crate) struct ManifestWriter { partition_spec: types::PartitionSpec, op: Operator, + table_location: String, // Output path relative to operator root. output_path: String, snapshot_id: i64, @@ -285,6 +288,7 @@ pub(crate) struct ManifestWriter { existing_rows: i64, deleted_files: i64, deleted_rows: i64, + seq_num: i64, min_seq_num: Option, } @@ -292,12 +296,15 @@ impl ManifestWriter { pub(crate) fn new( partition_spec: types::PartitionSpec, op: Operator, + table_location: impl Into, output_path: impl Into, snapshot_id: i64, + seq_num: i64, ) -> Self { Self { partition_spec, op, + table_location: table_location.into(), output_path: output_path.into(), snapshot_id, @@ -307,6 +314,7 @@ impl ManifestWriter { existing_rows: 0, deleted_files: 0, deleted_rows: 0, + seq_num, min_seq_num: None, } } @@ -333,8 +341,10 @@ impl ManifestWriter { let partition_type = self .partition_spec .partition_type(&manifest.metadata.schema)?; - avro_schema = - AvroSchema::try_from(&types::ManifestFile::v2_schema(partition_type))?; + avro_schema = to_avro_schema( + &types::ManifestFile::v2_schema(partition_type), + Some("manifest_entry"), + )?; self.v2_writer(&avro_schema, &manifest.metadata.schema)? } }; @@ -377,20 +387,20 @@ impl ManifestWriter { self.op.write(self.output_path.as_str(), connect).await?; Ok(ManifestListEntry { - manifest_path: format!("{}/{}", self.op.info().root(), &self.output_path), + manifest_path: format!("{}/{}", self.table_location, &self.output_path), manifest_length: length as i64, partition_spec_id: manifest.metadata.partition_spec_id, content: manifest.metadata.content, - sequence_number: UNASSIGNED_SEQ_NUM, + sequence_number: self.seq_num, min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQ_NUM), added_snapshot_id: self.snapshot_id, - added_files_count: self.added_files as i32, - existing_files_count: self.existing_files as i32, - deleted_files_count: self.deleted_files as i32, + added_data_files_count: self.added_files as i32, + existing_data_files_count: self.existing_files as i32, + deleted_data_files_count: self.deleted_files as i32, added_rows_count: self.added_rows, existing_rows_count: self.existing_rows, deleted_rows_count: self.deleted_rows, - partitions: None, + partitions: Vec::default(), key_metadata: None, }) } @@ -587,12 +597,15 @@ mod tests { async fn check_manifest_file_serde(manifest_file: types::ManifestFile) { let tmp_dir = TempDir::new().unwrap(); - let dir_path = tmp_dir.path().to_str().unwrap(); + let dir_path = { + let canonicalize = canonicalize(tmp_dir.path().to_str().unwrap()).unwrap(); + canonicalize.to_str().unwrap().to_string() + }; let filename = "test.avro"; let operator = { let mut builder = Fs::default(); - builder.root(dir_path); + builder.root(dir_path.as_str()); Operator::new(builder).unwrap().finish() }; @@ -601,14 +614,12 @@ mod tests { fields: vec![], }; - let writer = ManifestWriter::new(partition_spec, operator, filename, 3); + let writer = + ManifestWriter::new(partition_spec, operator, dir_path.as_str(), filename, 3, 1); let manifest_list_entry = writer.write(manifest_file.clone()).await.unwrap(); assert_eq!( - canonicalize(format!("{dir_path}/{filename}")) - .unwrap() - .to_str() - .unwrap(), + format!("{dir_path}/{filename}"), manifest_list_entry.manifest_path ); assert_eq!(manifest_file.metadata.content, manifest_list_entry.content); diff --git a/icelake/src/types/on_disk/manifest_list.rs b/icelake/src/types/on_disk/manifest_list.rs index eaf3c67..78518b5 100644 --- a/icelake/src/types/on_disk/manifest_list.rs +++ b/icelake/src/types/on_disk/manifest_list.rs @@ -7,6 +7,7 @@ use serde::Deserialize; use serde::Serialize; use crate::types; +use crate::types::to_avro::to_avro_schema; use crate::types::ManifestList; use crate::Error; use crate::ErrorKind; @@ -49,18 +50,18 @@ struct ManifestListEntry { #[serde(default)] added_snapshot_id: i64, #[serde(default)] - added_files_count: i32, + added_data_files_count: i32, #[serde(default)] - existing_files_count: i32, + existing_data_files_count: i32, #[serde(default)] - deleted_files_count: i32, + deleted_data_files_count: i32, #[serde(default)] added_rows_count: i64, #[serde(default)] existing_rows_count: i64, #[serde(default)] deleted_rows_count: i64, - partitions: Option>, + partitions: Vec, key_metadata: Option>, } @@ -70,16 +71,11 @@ impl TryFrom for types::ManifestListEntry { fn try_from(v: ManifestListEntry) -> Result { let content = (v.content as u8).try_into()?; - let partitions = match v.partitions { - Some(v) => { - let mut partitions = Vec::with_capacity(v.len()); - for partition in v { - partitions.push(partition.try_into()?); - } - Some(partitions) - } - None => None, - }; + let partitions = v + .partitions + .into_iter() + .map(types::FieldSummary::try_from) + .collect::>>()?; Ok(types::ManifestListEntry { manifest_path: v.manifest_path, @@ -89,9 +85,9 @@ impl TryFrom for types::ManifestListEntry { sequence_number: v.sequence_number, min_sequence_number: v.min_sequence_number, added_snapshot_id: v.added_snapshot_id, - added_files_count: v.added_files_count, - existing_files_count: v.existing_files_count, - deleted_files_count: v.deleted_files_count, + added_data_files_count: v.added_data_files_count, + existing_data_files_count: v.existing_data_files_count, + deleted_data_files_count: v.deleted_data_files_count, added_rows_count: v.added_rows_count, existing_rows_count: v.existing_rows_count, deleted_rows_count: v.deleted_rows_count, @@ -105,16 +101,11 @@ impl From for ManifestListEntry { fn from(value: types::ManifestListEntry) -> Self { let content: i32 = value.content as i32; - let partitions = match value.partitions { - Some(v) => { - let mut partitions = Vec::with_capacity(v.len()); - for partition in v { - partitions.push(partition.into()); - } - Some(partitions) - } - None => None, - }; + let partitions = value + .partitions + .into_iter() + .map(FieldSummary::from) + .collect::>(); Self { manifest_path: value.manifest_path, @@ -124,9 +115,9 @@ impl From for ManifestListEntry { sequence_number: value.sequence_number, min_sequence_number: value.min_sequence_number, added_snapshot_id: value.added_snapshot_id, - added_files_count: value.added_files_count, - existing_files_count: value.existing_files_count, - deleted_files_count: value.deleted_files_count, + added_data_files_count: value.added_data_files_count, + existing_data_files_count: value.existing_data_files_count, + deleted_data_files_count: value.deleted_data_files_count, added_rows_count: value.added_rows_count, existing_rows_count: value.existing_rows_count, deleted_rows_count: value.deleted_rows_count, @@ -197,8 +188,8 @@ impl ManifestListWriter { } /// Write manifest list to file. Return the absolute path of the file. - pub(crate) async fn write(self, manifest_list: ManifestList) -> Result { - let avro_schema = AvroSchema::try_from(&types::ManifestList::v2_schema())?; + pub(crate) async fn write(self, manifest_list: ManifestList) -> Result<()> { + let avro_schema = to_avro_schema(&types::ManifestList::v2_schema(), Some("manifest_file"))?; let mut avro_writer = self.v2_writer(&avro_schema)?; for entry in manifest_list.entries { @@ -209,7 +200,7 @@ impl ManifestListWriter { let connect = avro_writer.into_inner()?; self.op.write(self.output_path.as_str(), connect).await?; - Ok(format!("{}/{}", self.op.info().root(), &self.output_path)) + Ok(()) } fn v2_writer<'a>(&self, avro_schema: &'a AvroSchema) -> Result>> { @@ -232,7 +223,6 @@ impl ManifestListWriter { mod tests { use std::env; use std::fs; - use std::fs::canonicalize; use std::fs::read; use anyhow::Result; @@ -269,13 +259,13 @@ mod tests { sequence_number: 0, min_sequence_number: 0, added_snapshot_id: 1646658105718557341, - added_files_count: 0, - existing_files_count: 0, - deleted_files_count: 0, + added_data_files_count: 3, + existing_data_files_count: 0, + deleted_data_files_count: 0, added_rows_count: 3, existing_rows_count: 0, deleted_rows_count: 0, - partitions: Some(vec![]), + partitions: vec![], key_metadata: None, } ); @@ -306,13 +296,13 @@ mod tests { sequence_number: 0, min_sequence_number: 0, added_snapshot_id: 1646658105718557341, - added_files_count: 0, - existing_files_count: 0, - deleted_files_count: 0, + added_data_files_count: 3, + existing_data_files_count: 0, + deleted_data_files_count: 0, added_rows_count: 3, existing_rows_count: 0, deleted_rows_count: 0, - partitions: Some(vec![]), + partitions: vec![], key_metadata: None, } ); @@ -348,15 +338,7 @@ mod tests { }; let writer = ManifestListWriter::new(operator, filename.to_string(), 0, 0, 0); - let manifest_list_path = writer.write(manifest_file.clone()).await.unwrap(); - - assert_eq!( - canonicalize(format!("{dir_path}/{filename}")) - .unwrap() - .to_str() - .unwrap(), - manifest_list_path - ); + writer.write(manifest_file.clone()).await.unwrap(); let restored_manifest_file = { parse_manifest_list(&read(tmp_dir.path().join(filename)).unwrap()).unwrap() }; diff --git a/icelake/src/types/on_disk/schema.rs b/icelake/src/types/on_disk/schema.rs index 1de570b..6fb1024 100644 --- a/icelake/src/types/on_disk/schema.rs +++ b/icelake/src/types/on_disk/schema.rs @@ -23,6 +23,7 @@ pub struct Schema { #[serde(skip_serializing_if = "Option::is_none")] identifier_field_ids: Option>, fields: Vec, + r#type: String, } impl TryFrom for types::Schema { @@ -54,6 +55,7 @@ impl<'a> TryFrom<&'a types::Schema> for Schema { .iter() .map(|v| Field::try_from(v.clone())) .collect::>>()?, + r#type: "struct".to_string(), }) } } @@ -67,6 +69,7 @@ mod tests { assert_eq!(expected_schema, schema); let serialized_json_schema = serialize_schema(&expected_schema).unwrap(); + println!("{serialized_json_schema}"); assert_eq!( expected_schema, diff --git a/icelake/src/types/on_disk/snapshot.rs b/icelake/src/types/on_disk/snapshot.rs index 2636e9f..7692362 100644 --- a/icelake/src/types/on_disk/snapshot.rs +++ b/icelake/src/types/on_disk/snapshot.rs @@ -17,6 +17,7 @@ pub fn parse_snapshot(bs: &[u8]) -> Result { #[serde(rename_all = "kebab-case")] pub struct Snapshot { snapshot_id: i64, + #[serde(skip_serializing_if = "Option::is_none")] parent_snapshot_id: Option, #[serde(default)] sequence_number: i64, diff --git a/icelake/src/types/on_disk/table_metadata.rs b/icelake/src/types/on_disk/table_metadata.rs index d01228e..f1f5f6e 100644 --- a/icelake/src/types/on_disk/table_metadata.rs +++ b/icelake/src/types/on_disk/table_metadata.rs @@ -45,7 +45,7 @@ struct TableMetadata { metadata_log: Option>, sort_orders: Vec, default_sort_order_id: i32, - refs: Option>, + refs: HashMap, } impl TryFrom for types::TableMetadata { @@ -111,15 +111,12 @@ impl TryFrom for types::TableMetadata { sort_orders.push(sort_order.try_into()?); } - let refs = match v.refs { - Some(v) => { - let mut refs = HashMap::with_capacity(v.len()); - for (k, v) in v { - refs.insert(k, v.try_into()?); - } - Some(refs) + let refs = { + let mut refs = HashMap::with_capacity(v.refs.len()); + for (k, v) in v.refs { + refs.insert(k, v.try_into()?); } - None => None, + refs }; Ok(types::TableMetadata { @@ -204,12 +201,9 @@ impl TryFrom for TableMetadata { default_sort_order_id: value.default_sort_order_id, refs: value .refs - .map(|r| { - r.into_iter() - .map(|e| SnapshotReference::try_from(e.1).map(|s| (e.0, s))) - .collect::>>() - }) - .transpose()?, + .into_iter() + .map(|e| SnapshotReference::try_from(e.1).map(|s| (e.0, s))) + .collect::>>()?, }) } } @@ -411,7 +405,7 @@ mod tests { }]), sort_orders: vec![], default_sort_order_id: 1, - refs: None, + refs: HashMap::default(), }; let json = serialize_table_meta(metadata.clone()).unwrap(); diff --git a/icelake/src/types/to_avro.rs b/icelake/src/types/to_avro.rs index 46d2432..0a7acbc 100644 --- a/icelake/src/types/to_avro.rs +++ b/icelake/src/types/to_avro.rs @@ -12,20 +12,17 @@ use serde_json::{Number, Value as JsonValue}; use std::collections::BTreeMap; use std::iter::Iterator; -impl<'a> TryFrom<&'a Schema> for AvroSchema { - type Error = Error; +pub fn to_avro_schema(value: &Schema, name: Option<&str>) -> Result { + let avro_fields: Vec = value + .fields + .iter() + .map(AvroRecordField::try_from) + .collect::>>()?; - fn try_from(value: &'a Schema) -> Result { - let avro_fields: Vec = value - .fields - .iter() - .map(AvroRecordField::try_from) - .collect::>>()?; - Ok(AvroSchema::Record(avro_record_schema( - format!("r_{}", value.schema_id).as_str(), - avro_fields, - ))) - } + let name = name + .map(|s| s.to_string()) + .unwrap_or_else(|| format!("r_{}", value.schema_id)); + Ok(AvroSchema::Record(avro_record_schema(name, avro_fields))) } impl<'a> TryFrom<&'a Field> for AvroRecordField { @@ -47,6 +44,7 @@ impl<'a> TryFrom<&'a Field> for AvroRecordField { } avro_schema } + Any::List(_list) => AvroSchema::try_from(&value.field_type)?, _ => { let mut avro_schema = AvroSchema::try_from(&value.field_type)?; if !value.required { @@ -176,7 +174,7 @@ impl<'a> TryFrom<&'a Any> for AvroSchema { } fn avro_record_schema( - name: &str, + name: impl Into, fields: impl IntoIterator, ) -> AvroRecordSchema { let avro_fields = fields.into_iter().collect::>(); @@ -187,7 +185,7 @@ fn avro_record_schema( .collect(); AvroRecordSchema { - name: Name::from(name), + name: Name::from(name.into().as_str()), fields: avro_fields, aliases: None, doc: None, @@ -343,6 +341,6 @@ mod tests { AvroSchema::parse_str(raw_schema).unwrap() }; - assert_eq!(expected_avro_schema, AvroSchema::try_from(&schema).unwrap()); + assert_eq!(expected_avro_schema, to_avro_schema(&schema, None).unwrap()); } } diff --git a/tests/integration/docker/docker-compose.yml b/tests/integration/docker/docker-compose.yml new file mode 100644 index 0000000..507a3e7 --- /dev/null +++ b/tests/integration/docker/docker-compose.yml @@ -0,0 +1,66 @@ +version: "3" + +services: + spark: + image: apache/spark:3.4.1 + container_name: spark + user: root + depends_on: + - mc + healthcheck: + test: netstat -ltn | grep -c ":15002" + interval: 5s + retries: 120 + ports: + - "15002:15002" + networks: + iceberg_net: + environment: + - SPARK_HOME=/opt/spark + - PYSPARK_PYTON=/usr/bin/python3.9 + - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + volumes: + - './spark-script:/spark-script' + command: ["bash", "/spark-script/spark-connect-server.sh"] + + minio: + image: minio/minio + container_name: minio + environment: + MINIO_ROOT_USER: admin + MINIO_ROOT_PASSWORD: password + MINIO_DOMAIN: minio + MINIO_HTTP_TRACE: /dev/stdout + networks: + iceberg_net: + aliases: + - icebergdata.minio + ports: + - "9001:9001" + - "9000:9000" + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc + container_name: mc + networks: + iceberg_net: + environment: + AWS_ACCESS_KEY_ID: admin + AWS_SECRET_ACCESS_KEY: password + AWS_REGION: us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/icebergdata; + /usr/bin/mc mb minio/icebergdata/demo; + /usr/bin/mc policy set public minio/icebergdata/demo; + tail -f /dev/null + " +networks: + iceberg_net: diff --git a/tests/integration/docker/spark-script/.gitignore b/tests/integration/docker/spark-script/.gitignore new file mode 100644 index 0000000..51dcf07 --- /dev/null +++ b/tests/integration/docker/spark-script/.gitignore @@ -0,0 +1,3 @@ +derby.log +metastore_db +.ivy \ No newline at end of file diff --git a/tests/integration/docker/spark-script/init-table.sql b/tests/integration/docker/spark-script/init-table.sql new file mode 100644 index 0000000..8f60592 --- /dev/null +++ b/tests/integration/docker/spark-script/init-table.sql @@ -0,0 +1,18 @@ +CREATE SCHEMA IF NOT EXISTS s1; + +USE s1; + +DROP TABLE IF EXISTS t1; + +CREATE TABLE t1 +( + id bigint, + name string, + distance bigint +) USING iceberg +TBLPROPERTIES ('format-version'='2'); + +INSERT INTO t1 VALUES (1, "a", 100), (2, "b", 200); + + + diff --git a/tests/integration/docker/spark-script/insert-table.sql b/tests/integration/docker/spark-script/insert-table.sql new file mode 100644 index 0000000..ebeb17d --- /dev/null +++ b/tests/integration/docker/spark-script/insert-table.sql @@ -0,0 +1,6 @@ +USE s1; + +INSERT INTO t1 VALUES (3, "a", 300), (4, "b", 400); + + + diff --git a/tests/integration/docker/spark-script/inspect-table.sql b/tests/integration/docker/spark-script/inspect-table.sql new file mode 100644 index 0000000..b626a74 --- /dev/null +++ b/tests/integration/docker/spark-script/inspect-table.sql @@ -0,0 +1,7 @@ +DESCRIBE demo.s1.t1.files; + +SELECT * FROM demo.s1.t1.files; + +DESCRIBE demo.s1.t1.manifests; + +SELECT * FROM demo.s1.t1.manifests; diff --git a/tests/integration/docker/spark-script/query-table.sql b/tests/integration/docker/spark-script/query-table.sql new file mode 100644 index 0000000..7de932b --- /dev/null +++ b/tests/integration/docker/spark-script/query-table.sql @@ -0,0 +1 @@ +SELECT * FROM demo.s1.t1 ORDER BY id ASC; diff --git a/tests/integration/docker/spark-script/run-sql-file.sh b/tests/integration/docker/spark-script/run-sql-file.sh new file mode 100644 index 0000000..685f228 --- /dev/null +++ b/tests/integration/docker/spark-script/run-sql-file.sh @@ -0,0 +1,27 @@ +set -ex + +ICEBERG_VERSION=1.3.1 +DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:$ICEBERG_VERSION,org.apache.hadoop:hadoop-aws:3.3.2" + +## add AWS dependency +#AWS_SDK_VERSION=2.20.18 +#AWS_MAVEN_GROUP=software.amazon.awssdk +#AWS_PACKAGES=( +# "bundle" +#) +#for pkg in "${AWS_PACKAGES[@]}"; do +# DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION" +#done + +spark-sql --packages $DEPENDENCIES \ + --master local[3] \ + --files /spark-script/log4j.properties \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.demo.type=hadoop \ + --conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://minio:9000 \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=admin \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=password \ + --conf spark.sql.defaultCatalog=demo \ + -f /spark-script/$1.sql diff --git a/tests/integration/docker/spark-script/spark-connect-server.sh b/tests/integration/docker/spark-script/spark-connect-server.sh new file mode 100644 index 0000000..d4431ed --- /dev/null +++ b/tests/integration/docker/spark-script/spark-connect-server.sh @@ -0,0 +1,21 @@ +set -ex + +ICEBERG_VERSION=1.3.1 +SPARK_VERSION=3.4.1 + +PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:$ICEBERG_VERSION,org.apache.hadoop:hadoop-aws:3.3.2" +PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION" + +/opt/spark/sbin/start-connect-server.sh --packages $PACKAGES \ + --master local[3] \ + --conf spark.driver.bindAddress=0.0.0.0 \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.demo.type=hadoop \ + --conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://minio:9000 \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=admin \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=password \ + --conf spark.sql.defaultCatalog=demo + +tail -f /opt/spark/logs/spark*.out \ No newline at end of file diff --git a/tests/integration/python/check.py b/tests/integration/python/check.py new file mode 100644 index 0000000..c1e88e0 --- /dev/null +++ b/tests/integration/python/check.py @@ -0,0 +1,31 @@ +from pyspark.sql import SparkSession +import csv +import argparse +import unittest + + +def check(args): + tc = unittest.TestCase() + spark = SparkSession.builder.remote(args.sparkurl).getOrCreate() + + sql = "SELECT * FROM s1.t1 ORDER BY id ASC" + print(f"Executing sql: {sql}") + df = spark.sql(sql).collect() + for row in df: + print(row) + + with open(args.file, newline='') as insert_csv_file: + csv_result = csv.reader(insert_csv_file) + for (row1, row2) in zip(df, csv_result): + print(f"Row1: {row1}, row 2: {row2}") + tc.assertEqual(row1[0], int(row2[0])) + tc.assertEqual(row1[1], row2[1]) + tc.assertEqual(row1[2], int(row2[2])) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Test icelake with spark') + parser.add_argument('-s', dest='sparkurl', type=str, help='Spark remote url') + parser.add_argument("-f", dest='file', type=str, help='Path to query csv file') + + check(parser.parse_args()) diff --git a/tests/integration/python/init.py b/tests/integration/python/init.py new file mode 100644 index 0000000..c248c07 --- /dev/null +++ b/tests/integration/python/init.py @@ -0,0 +1,41 @@ +from pyspark.sql import SparkSession +import csv +import argparse + + +def check(args): + spark = SparkSession.builder.remote(args.sparkurl).getOrCreate() + + init_table_sqls = [ + "CREATE SCHEMA IF NOT EXISTS s1", + "DROP TABLE IF EXISTS s1.t1", + """ + CREATE TABLE s1.t1 + ( + id bigint, + name string, + distance bigint + ) USING iceberg + TBLPROPERTIES ('format-version'='2'); + """ + ] + + for sql in init_table_sqls: + print(f"Executing sql: {sql}") + spark.sql(sql) + + with open(args.file, newline='') as insert_csv_file: + inserts = ", ".join([f"""({row[0]}, "{row[1]}", {row[2]})""" for row in csv.reader(insert_csv_file)]) + sql = f""" + INSERT INTO s1.t1 VALUES {inserts} + """ + print(f"Executing sql: {sql}") + spark.sql(sql) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Test icelake with spark') + parser.add_argument('-s', dest='sparkurl', type=str, help='Spark remote url') + parser.add_argument("-f", dest='file', type=str, help='Path to insert csv file') + + check(parser.parse_args()) diff --git a/tests/integration/python/poetry.lock b/tests/integration/python/poetry.lock new file mode 100644 index 0000000..d8176f7 --- /dev/null +++ b/tests/integration/python/poetry.lock @@ -0,0 +1,339 @@ +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. + +[[package]] +name = "googleapis-common-protos" +version = "1.60.0" +description = "Common protobufs used in Google APIs" +optional = false +python-versions = ">=3.7" +files = [ + {file = "googleapis-common-protos-1.60.0.tar.gz", hash = "sha256:e73ebb404098db405ba95d1e1ae0aa91c3e15a71da031a2eeb6b2e23e7bc3708"}, + {file = "googleapis_common_protos-1.60.0-py2.py3-none-any.whl", hash = "sha256:69f9bbcc6acde92cab2db95ce30a70bd2b81d20b12eff3f1aabaffcbe8a93918"}, +] + +[package.dependencies] +protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0.dev0" + +[package.extras] +grpc = ["grpcio (>=1.44.0,<2.0.0.dev0)"] + +[[package]] +name = "grpcio" +version = "1.56.2" +description = "HTTP/2-based RPC framework" +optional = false +python-versions = ">=3.7" +files = [ + {file = "grpcio-1.56.2-cp310-cp310-linux_armv7l.whl", hash = "sha256:bf0b9959e673505ee5869950642428046edb91f99942607c2ecf635f8a4b31c9"}, + {file = "grpcio-1.56.2-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:5144feb20fe76e73e60c7d73ec3bf54f320247d1ebe737d10672480371878b48"}, + {file = "grpcio-1.56.2-cp310-cp310-manylinux_2_17_aarch64.whl", hash = "sha256:a72797549935c9e0b9bc1def1768c8b5a709538fa6ab0678e671aec47ebfd55e"}, + {file = "grpcio-1.56.2-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c3f3237a57e42f79f1e560726576aedb3a7ef931f4e3accb84ebf6acc485d316"}, + {file = "grpcio-1.56.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:900bc0096c2ca2d53f2e5cebf98293a7c32f532c4aeb926345e9747452233950"}, + {file = "grpcio-1.56.2-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:97e0efaebbfd222bcaac2f1735c010c1d3b167112d9d237daebbeedaaccf3d1d"}, + {file = "grpcio-1.56.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c0c85c5cbe8b30a32fa6d802588d55ffabf720e985abe9590c7c886919d875d4"}, + {file = "grpcio-1.56.2-cp310-cp310-win32.whl", hash = "sha256:06e84ad9ae7668a109e970c7411e7992751a116494cba7c4fb877656527f9a57"}, + {file = "grpcio-1.56.2-cp310-cp310-win_amd64.whl", hash = "sha256:10954662f77dc36c9a1fb5cc4a537f746580d6b5734803be1e587252682cda8d"}, + {file = "grpcio-1.56.2-cp311-cp311-linux_armv7l.whl", hash = "sha256:c435f5ce1705de48e08fcbcfaf8aee660d199c90536e3e06f2016af7d6a938dd"}, + {file = "grpcio-1.56.2-cp311-cp311-macosx_10_10_universal2.whl", hash = "sha256:6108e5933eb8c22cd3646e72d5b54772c29f57482fd4c41a0640aab99eb5071d"}, + {file = "grpcio-1.56.2-cp311-cp311-manylinux_2_17_aarch64.whl", hash = "sha256:8391cea5ce72f4a12368afd17799474015d5d3dc00c936a907eb7c7eaaea98a5"}, + {file = "grpcio-1.56.2-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:750de923b456ca8c0f1354d6befca45d1f3b3a789e76efc16741bd4132752d95"}, + {file = "grpcio-1.56.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fda2783c12f553cdca11c08e5af6eecbd717280dc8fbe28a110897af1c15a88c"}, + {file = "grpcio-1.56.2-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:9e04d4e4cfafa7c5264e535b5d28e786f0571bea609c3f0aaab13e891e933e9c"}, + {file = "grpcio-1.56.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:89a49cc5ad08a38b6141af17e00d1dd482dc927c7605bc77af457b5a0fca807c"}, + {file = "grpcio-1.56.2-cp311-cp311-win32.whl", hash = "sha256:6a007a541dff984264981fbafeb052bfe361db63578948d857907df9488d8774"}, + {file = "grpcio-1.56.2-cp311-cp311-win_amd64.whl", hash = "sha256:af4063ef2b11b96d949dccbc5a987272f38d55c23c4c01841ea65a517906397f"}, + {file = "grpcio-1.56.2-cp37-cp37m-linux_armv7l.whl", hash = "sha256:a6ff459dac39541e6a2763a4439c4ca6bc9ecb4acc05a99b79246751f9894756"}, + {file = "grpcio-1.56.2-cp37-cp37m-macosx_10_10_universal2.whl", hash = "sha256:f20fd21f7538f8107451156dd1fe203300b79a9ddceba1ee0ac8132521a008ed"}, + {file = "grpcio-1.56.2-cp37-cp37m-manylinux_2_17_aarch64.whl", hash = "sha256:d1fbad1f9077372b6587ec589c1fc120b417b6c8ad72d3e3cc86bbbd0a3cee93"}, + {file = "grpcio-1.56.2-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6ee26e9dfb3996aff7c870f09dc7ad44a5f6732b8bdb5a5f9905737ac6fd4ef1"}, + {file = "grpcio-1.56.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a4c60abd950d6de3e4f1ddbc318075654d275c29c846ab6a043d6ed2c52e4c8c"}, + {file = "grpcio-1.56.2-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:1c31e52a04e62c8577a7bf772b3e7bed4df9c9e0dd90f92b6ffa07c16cab63c9"}, + {file = "grpcio-1.56.2-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:345356b307cce5d14355e8e055b4ca5f99bc857c33a3dc1ddbc544fca9cd0475"}, + {file = "grpcio-1.56.2-cp37-cp37m-win_amd64.whl", hash = "sha256:42e63904ee37ae46aa23de50dac8b145b3596f43598fa33fe1098ab2cbda6ff5"}, + {file = "grpcio-1.56.2-cp38-cp38-linux_armv7l.whl", hash = "sha256:7c5ede2e2558f088c49a1ddda19080e4c23fb5d171de80a726b61b567e3766ed"}, + {file = "grpcio-1.56.2-cp38-cp38-macosx_10_10_universal2.whl", hash = "sha256:33971197c47965cc1d97d78d842163c283e998223b151bab0499b951fd2c0b12"}, + {file = "grpcio-1.56.2-cp38-cp38-manylinux_2_17_aarch64.whl", hash = "sha256:d39f5d4af48c138cb146763eda14eb7d8b3ccbbec9fe86fb724cd16e0e914c64"}, + {file = "grpcio-1.56.2-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ded637176addc1d3eef35331c39acc598bac550d213f0a1bedabfceaa2244c87"}, + {file = "grpcio-1.56.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c90da4b124647547a68cf2f197174ada30c7bb9523cb976665dfd26a9963d328"}, + {file = "grpcio-1.56.2-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:3ccb621749a81dc7755243665a70ce45536ec413ef5818e013fe8dfbf5aa497b"}, + {file = "grpcio-1.56.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:4eb37dd8dd1aa40d601212afa27ca5be255ba792e2e0b24d67b8af5e012cdb7d"}, + {file = "grpcio-1.56.2-cp38-cp38-win32.whl", hash = "sha256:ddb4a6061933bd9332b74eac0da25f17f32afa7145a33a0f9711ad74f924b1b8"}, + {file = "grpcio-1.56.2-cp38-cp38-win_amd64.whl", hash = "sha256:8940d6de7068af018dfa9a959a3510e9b7b543f4c405e88463a1cbaa3b2b379a"}, + {file = "grpcio-1.56.2-cp39-cp39-linux_armv7l.whl", hash = "sha256:51173e8fa6d9a2d85c14426bdee5f5c4a0654fd5fddcc21fe9d09ab0f6eb8b35"}, + {file = "grpcio-1.56.2-cp39-cp39-macosx_10_10_universal2.whl", hash = "sha256:373b48f210f43327a41e397391715cd11cfce9ded2fe76a5068f9bacf91cc226"}, + {file = "grpcio-1.56.2-cp39-cp39-manylinux_2_17_aarch64.whl", hash = "sha256:42a3bbb2bc07aef72a7d97e71aabecaf3e4eb616d39e5211e2cfe3689de860ca"}, + {file = "grpcio-1.56.2-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5344be476ac37eb9c9ad09c22f4ea193c1316bf074f1daf85bddb1b31fda5116"}, + {file = "grpcio-1.56.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c3fa3ab0fb200a2c66493828ed06ccd1a94b12eddbfb985e7fd3e5723ff156c6"}, + {file = "grpcio-1.56.2-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:b975b85d1d5efc36cf8b237c5f3849b64d1ba33d6282f5e991f28751317504a1"}, + {file = "grpcio-1.56.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:cbdf2c498e077282cd427cfd88bdce4668019791deef0be8155385ab2ba7837f"}, + {file = "grpcio-1.56.2-cp39-cp39-win32.whl", hash = "sha256:139f66656a762572ae718fa0d1f2dce47c05e9fbf7a16acd704c354405b97df9"}, + {file = "grpcio-1.56.2-cp39-cp39-win_amd64.whl", hash = "sha256:830215173ad45d670140ff99aac3b461f9be9a6b11bee1a17265aaaa746a641a"}, + {file = "grpcio-1.56.2.tar.gz", hash = "sha256:0ff789ae7d8ddd76d2ac02e7d13bfef6fc4928ac01e1dcaa182be51b6bcc0aaa"}, +] + +[package.extras] +protobuf = ["grpcio-tools (>=1.56.2)"] + +[[package]] +name = "grpcio-status" +version = "1.56.2" +description = "Status proto mapping for gRPC" +optional = false +python-versions = ">=3.6" +files = [ + {file = "grpcio-status-1.56.2.tar.gz", hash = "sha256:a046b2c0118df4a5687f4585cca9d3c3bae5c498c4dff055dcb43fb06a1180c8"}, + {file = "grpcio_status-1.56.2-py3-none-any.whl", hash = "sha256:63f3842867735f59f5d70e723abffd2e8501a6bcd915612a1119e52f10614782"}, +] + +[package.dependencies] +googleapis-common-protos = ">=1.5.5" +grpcio = ">=1.56.2" +protobuf = ">=4.21.6" + +[[package]] +name = "numpy" +version = "1.25.2" +description = "Fundamental package for array computing in Python" +optional = false +python-versions = ">=3.9" +files = [ + {file = "numpy-1.25.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:db3ccc4e37a6873045580d413fe79b68e47a681af8db2e046f1dacfa11f86eb3"}, + {file = "numpy-1.25.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:90319e4f002795ccfc9050110bbbaa16c944b1c37c0baeea43c5fb881693ae1f"}, + {file = "numpy-1.25.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dfe4a913e29b418d096e696ddd422d8a5d13ffba4ea91f9f60440a3b759b0187"}, + {file = "numpy-1.25.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f08f2e037bba04e707eebf4bc934f1972a315c883a9e0ebfa8a7756eabf9e357"}, + {file = "numpy-1.25.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:bec1e7213c7cb00d67093247f8c4db156fd03075f49876957dca4711306d39c9"}, + {file = "numpy-1.25.2-cp310-cp310-win32.whl", hash = "sha256:7dc869c0c75988e1c693d0e2d5b26034644399dd929bc049db55395b1379e044"}, + {file = "numpy-1.25.2-cp310-cp310-win_amd64.whl", hash = "sha256:834b386f2b8210dca38c71a6e0f4fd6922f7d3fcff935dbe3a570945acb1b545"}, + {file = "numpy-1.25.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c5462d19336db4560041517dbb7759c21d181a67cb01b36ca109b2ae37d32418"}, + {file = "numpy-1.25.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:c5652ea24d33585ea39eb6a6a15dac87a1206a692719ff45d53c5282e66d4a8f"}, + {file = "numpy-1.25.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0d60fbae8e0019865fc4784745814cff1c421df5afee233db6d88ab4f14655a2"}, + {file = "numpy-1.25.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:60e7f0f7f6d0eee8364b9a6304c2845b9c491ac706048c7e8cf47b83123b8dbf"}, + {file = "numpy-1.25.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:bb33d5a1cf360304754913a350edda36d5b8c5331a8237268c48f91253c3a364"}, + {file = "numpy-1.25.2-cp311-cp311-win32.whl", hash = "sha256:5883c06bb92f2e6c8181df7b39971a5fb436288db58b5a1c3967702d4278691d"}, + {file = "numpy-1.25.2-cp311-cp311-win_amd64.whl", hash = "sha256:5c97325a0ba6f9d041feb9390924614b60b99209a71a69c876f71052521d42a4"}, + {file = "numpy-1.25.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b79e513d7aac42ae918db3ad1341a015488530d0bb2a6abcbdd10a3a829ccfd3"}, + {file = "numpy-1.25.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:eb942bfb6f84df5ce05dbf4b46673ffed0d3da59f13635ea9b926af3deb76926"}, + {file = "numpy-1.25.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3e0746410e73384e70d286f93abf2520035250aad8c5714240b0492a7302fdca"}, + {file = "numpy-1.25.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7806500e4f5bdd04095e849265e55de20d8cc4b661b038957354327f6d9b295"}, + {file = "numpy-1.25.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8b77775f4b7df768967a7c8b3567e309f617dd5e99aeb886fa14dc1a0791141f"}, + {file = "numpy-1.25.2-cp39-cp39-win32.whl", hash = "sha256:2792d23d62ec51e50ce4d4b7d73de8f67a2fd3ea710dcbc8563a51a03fb07b01"}, + {file = "numpy-1.25.2-cp39-cp39-win_amd64.whl", hash = "sha256:76b4115d42a7dfc5d485d358728cdd8719be33cc5ec6ec08632a5d6fca2ed380"}, + {file = "numpy-1.25.2-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:1a1329e26f46230bf77b02cc19e900db9b52f398d6722ca853349a782d4cff55"}, + {file = "numpy-1.25.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c3abc71e8b6edba80a01a52e66d83c5d14433cbcd26a40c329ec7ed09f37901"}, + {file = "numpy-1.25.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:1b9735c27cea5d995496f46a8b1cd7b408b3f34b6d50459d9ac8fe3a20cc17bf"}, + {file = "numpy-1.25.2.tar.gz", hash = "sha256:fd608e19c8d7c55021dffd43bfe5492fab8cc105cc8986f813f8c3c048b38760"}, +] + +[[package]] +name = "pandas" +version = "2.0.3" +description = "Powerful data structures for data analysis, time series, and statistics" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pandas-2.0.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e4c7c9f27a4185304c7caf96dc7d91bc60bc162221152de697c98eb0b2648dd8"}, + {file = "pandas-2.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f167beed68918d62bffb6ec64f2e1d8a7d297a038f86d4aed056b9493fca407f"}, + {file = "pandas-2.0.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ce0c6f76a0f1ba361551f3e6dceaff06bde7514a374aa43e33b588ec10420183"}, + {file = "pandas-2.0.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba619e410a21d8c387a1ea6e8a0e49bb42216474436245718d7f2e88a2f8d7c0"}, + {file = "pandas-2.0.3-cp310-cp310-win32.whl", hash = "sha256:3ef285093b4fe5058eefd756100a367f27029913760773c8bf1d2d8bebe5d210"}, + {file = "pandas-2.0.3-cp310-cp310-win_amd64.whl", hash = "sha256:9ee1a69328d5c36c98d8e74db06f4ad518a1840e8ccb94a4ba86920986bb617e"}, + {file = "pandas-2.0.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:b084b91d8d66ab19f5bb3256cbd5ea661848338301940e17f4492b2ce0801fe8"}, + {file = "pandas-2.0.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:37673e3bdf1551b95bf5d4ce372b37770f9529743d2498032439371fc7b7eb26"}, + {file = "pandas-2.0.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b9cb1e14fdb546396b7e1b923ffaeeac24e4cedd14266c3497216dd4448e4f2d"}, + {file = "pandas-2.0.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d9cd88488cceb7635aebb84809d087468eb33551097d600c6dad13602029c2df"}, + {file = "pandas-2.0.3-cp311-cp311-win32.whl", hash = "sha256:694888a81198786f0e164ee3a581df7d505024fbb1f15202fc7db88a71d84ebd"}, + {file = "pandas-2.0.3-cp311-cp311-win_amd64.whl", hash = "sha256:6a21ab5c89dcbd57f78d0ae16630b090eec626360085a4148693def5452d8a6b"}, + {file = "pandas-2.0.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:9e4da0d45e7f34c069fe4d522359df7d23badf83abc1d1cef398895822d11061"}, + {file = "pandas-2.0.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:32fca2ee1b0d93dd71d979726b12b61faa06aeb93cf77468776287f41ff8fdc5"}, + {file = "pandas-2.0.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:258d3624b3ae734490e4d63c430256e716f488c4fcb7c8e9bde2d3aa46c29089"}, + {file = "pandas-2.0.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9eae3dc34fa1aa7772dd3fc60270d13ced7346fcbcfee017d3132ec625e23bb0"}, + {file = "pandas-2.0.3-cp38-cp38-win32.whl", hash = "sha256:f3421a7afb1a43f7e38e82e844e2bca9a6d793d66c1a7f9f0ff39a795bbc5e02"}, + {file = "pandas-2.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:69d7f3884c95da3a31ef82b7618af5710dba95bb885ffab339aad925c3e8ce78"}, + {file = "pandas-2.0.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5247fb1ba347c1261cbbf0fcfba4a3121fbb4029d95d9ef4dc45406620b25c8b"}, + {file = "pandas-2.0.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:81af086f4543c9d8bb128328b5d32e9986e0c84d3ee673a2ac6fb57fd14f755e"}, + {file = "pandas-2.0.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1994c789bf12a7c5098277fb43836ce090f1073858c10f9220998ac74f37c69b"}, + {file = "pandas-2.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5ec591c48e29226bcbb316e0c1e9423622bc7a4eaf1ef7c3c9fa1a3981f89641"}, + {file = "pandas-2.0.3-cp39-cp39-win32.whl", hash = "sha256:04dbdbaf2e4d46ca8da896e1805bc04eb85caa9a82e259e8eed00254d5e0c682"}, + {file = "pandas-2.0.3-cp39-cp39-win_amd64.whl", hash = "sha256:1168574b036cd8b93abc746171c9b4f1b83467438a5e45909fed645cf8692dbc"}, + {file = "pandas-2.0.3.tar.gz", hash = "sha256:c02f372a88e0d17f36d3093a644c73cfc1788e876a7c4bcb4020a77512e2043c"}, +] + +[package.dependencies] +numpy = [ + {version = ">=1.21.0", markers = "python_version >= \"3.10\""}, + {version = ">=1.23.2", markers = "python_version >= \"3.11\""}, +] +python-dateutil = ">=2.8.2" +pytz = ">=2020.1" +tzdata = ">=2022.1" + +[package.extras] +all = ["PyQt5 (>=5.15.1)", "SQLAlchemy (>=1.4.16)", "beautifulsoup4 (>=4.9.3)", "bottleneck (>=1.3.2)", "brotlipy (>=0.7.0)", "fastparquet (>=0.6.3)", "fsspec (>=2021.07.0)", "gcsfs (>=2021.07.0)", "html5lib (>=1.1)", "hypothesis (>=6.34.2)", "jinja2 (>=3.0.0)", "lxml (>=4.6.3)", "matplotlib (>=3.6.1)", "numba (>=0.53.1)", "numexpr (>=2.7.3)", "odfpy (>=1.4.1)", "openpyxl (>=3.0.7)", "pandas-gbq (>=0.15.0)", "psycopg2 (>=2.8.6)", "pyarrow (>=7.0.0)", "pymysql (>=1.0.2)", "pyreadstat (>=1.1.2)", "pytest (>=7.3.2)", "pytest-asyncio (>=0.17.0)", "pytest-xdist (>=2.2.0)", "python-snappy (>=0.6.0)", "pyxlsb (>=1.0.8)", "qtpy (>=2.2.0)", "s3fs (>=2021.08.0)", "scipy (>=1.7.1)", "tables (>=3.6.1)", "tabulate (>=0.8.9)", "xarray (>=0.21.0)", "xlrd (>=2.0.1)", "xlsxwriter (>=1.4.3)", "zstandard (>=0.15.2)"] +aws = ["s3fs (>=2021.08.0)"] +clipboard = ["PyQt5 (>=5.15.1)", "qtpy (>=2.2.0)"] +compression = ["brotlipy (>=0.7.0)", "python-snappy (>=0.6.0)", "zstandard (>=0.15.2)"] +computation = ["scipy (>=1.7.1)", "xarray (>=0.21.0)"] +excel = ["odfpy (>=1.4.1)", "openpyxl (>=3.0.7)", "pyxlsb (>=1.0.8)", "xlrd (>=2.0.1)", "xlsxwriter (>=1.4.3)"] +feather = ["pyarrow (>=7.0.0)"] +fss = ["fsspec (>=2021.07.0)"] +gcp = ["gcsfs (>=2021.07.0)", "pandas-gbq (>=0.15.0)"] +hdf5 = ["tables (>=3.6.1)"] +html = ["beautifulsoup4 (>=4.9.3)", "html5lib (>=1.1)", "lxml (>=4.6.3)"] +mysql = ["SQLAlchemy (>=1.4.16)", "pymysql (>=1.0.2)"] +output-formatting = ["jinja2 (>=3.0.0)", "tabulate (>=0.8.9)"] +parquet = ["pyarrow (>=7.0.0)"] +performance = ["bottleneck (>=1.3.2)", "numba (>=0.53.1)", "numexpr (>=2.7.1)"] +plot = ["matplotlib (>=3.6.1)"] +postgresql = ["SQLAlchemy (>=1.4.16)", "psycopg2 (>=2.8.6)"] +spss = ["pyreadstat (>=1.1.2)"] +sql-other = ["SQLAlchemy (>=1.4.16)"] +test = ["hypothesis (>=6.34.2)", "pytest (>=7.3.2)", "pytest-asyncio (>=0.17.0)", "pytest-xdist (>=2.2.0)"] +xml = ["lxml (>=4.6.3)"] + +[[package]] +name = "protobuf" +version = "4.23.4" +description = "" +optional = false +python-versions = ">=3.7" +files = [ + {file = "protobuf-4.23.4-cp310-abi3-win32.whl", hash = "sha256:5fea3c64d41ea5ecf5697b83e41d09b9589e6f20b677ab3c48e5f242d9b7897b"}, + {file = "protobuf-4.23.4-cp310-abi3-win_amd64.whl", hash = "sha256:7b19b6266d92ca6a2a87effa88ecc4af73ebc5cfde194dc737cf8ef23a9a3b12"}, + {file = "protobuf-4.23.4-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:8547bf44fe8cec3c69e3042f5c4fb3e36eb2a7a013bb0a44c018fc1e427aafbd"}, + {file = "protobuf-4.23.4-cp37-abi3-manylinux2014_aarch64.whl", hash = "sha256:fee88269a090ada09ca63551bf2f573eb2424035bcf2cb1b121895b01a46594a"}, + {file = "protobuf-4.23.4-cp37-abi3-manylinux2014_x86_64.whl", hash = "sha256:effeac51ab79332d44fba74660d40ae79985901ac21bca408f8dc335a81aa597"}, + {file = "protobuf-4.23.4-cp37-cp37m-win32.whl", hash = "sha256:c3e0939433c40796ca4cfc0fac08af50b00eb66a40bbbc5dee711998fb0bbc1e"}, + {file = "protobuf-4.23.4-cp37-cp37m-win_amd64.whl", hash = "sha256:9053df6df8e5a76c84339ee4a9f5a2661ceee4a0dab019e8663c50ba324208b0"}, + {file = "protobuf-4.23.4-cp38-cp38-win32.whl", hash = "sha256:e1c915778d8ced71e26fcf43c0866d7499891bca14c4368448a82edc61fdbc70"}, + {file = "protobuf-4.23.4-cp38-cp38-win_amd64.whl", hash = "sha256:351cc90f7d10839c480aeb9b870a211e322bf05f6ab3f55fcb2f51331f80a7d2"}, + {file = "protobuf-4.23.4-cp39-cp39-win32.whl", hash = "sha256:6dd9b9940e3f17077e820b75851126615ee38643c2c5332aa7a359988820c720"}, + {file = "protobuf-4.23.4-cp39-cp39-win_amd64.whl", hash = "sha256:0a5759f5696895de8cc913f084e27fd4125e8fb0914bb729a17816a33819f474"}, + {file = "protobuf-4.23.4-py3-none-any.whl", hash = "sha256:e9d0be5bf34b275b9f87ba7407796556abeeba635455d036c7351f7c183ef8ff"}, + {file = "protobuf-4.23.4.tar.gz", hash = "sha256:ccd9430c0719dce806b93f89c91de7977304729e55377f872a92465d548329a9"}, +] + +[[package]] +name = "py4j" +version = "0.10.9.7" +description = "Enables Python programs to dynamically access arbitrary Java objects" +optional = false +python-versions = "*" +files = [ + {file = "py4j-0.10.9.7-py2.py3-none-any.whl", hash = "sha256:85defdfd2b2376eb3abf5ca6474b51ab7e0de341c75a02f46dc9b5976f5a5c1b"}, + {file = "py4j-0.10.9.7.tar.gz", hash = "sha256:0b6e5315bb3ada5cf62ac651d107bb2ebc02def3dee9d9548e3baac644ea8dbb"}, +] + +[[package]] +name = "pyarrow" +version = "12.0.1" +description = "Python library for Apache Arrow" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pyarrow-12.0.1-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:6d288029a94a9bb5407ceebdd7110ba398a00412c5b0155ee9813a40d246c5df"}, + {file = "pyarrow-12.0.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:345e1828efdbd9aa4d4de7d5676778aba384a2c3add896d995b23d368e60e5af"}, + {file = "pyarrow-12.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8d6009fdf8986332b2169314da482baed47ac053311c8934ac6651e614deacd6"}, + {file = "pyarrow-12.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2d3c4cbbf81e6dd23fe921bc91dc4619ea3b79bc58ef10bce0f49bdafb103daf"}, + {file = "pyarrow-12.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:cdacf515ec276709ac8042c7d9bd5be83b4f5f39c6c037a17a60d7ebfd92c890"}, + {file = "pyarrow-12.0.1-cp311-cp311-macosx_10_14_x86_64.whl", hash = "sha256:749be7fd2ff260683f9cc739cb862fb11be376de965a2a8ccbf2693b098db6c7"}, + {file = "pyarrow-12.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6895b5fb74289d055c43db3af0de6e16b07586c45763cb5e558d38b86a91e3a7"}, + {file = "pyarrow-12.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1887bdae17ec3b4c046fcf19951e71b6a619f39fa674f9881216173566c8f718"}, + {file = "pyarrow-12.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e2c9cb8eeabbadf5fcfc3d1ddea616c7ce893db2ce4dcef0ac13b099ad7ca082"}, + {file = "pyarrow-12.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:ce4aebdf412bd0eeb800d8e47db854f9f9f7e2f5a0220440acf219ddfddd4f63"}, + {file = "pyarrow-12.0.1-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:e0d8730c7f6e893f6db5d5b86eda42c0a130842d101992b581e2138e4d5663d3"}, + {file = "pyarrow-12.0.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:43364daec02f69fec89d2315f7fbfbeec956e0d991cbbef471681bd77875c40f"}, + {file = "pyarrow-12.0.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:051f9f5ccf585f12d7de836e50965b3c235542cc896959320d9776ab93f3b33d"}, + {file = "pyarrow-12.0.1-cp37-cp37m-win_amd64.whl", hash = "sha256:be2757e9275875d2a9c6e6052ac7957fbbfc7bc7370e4a036a9b893e96fedaba"}, + {file = "pyarrow-12.0.1-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:cf812306d66f40f69e684300f7af5111c11f6e0d89d6b733e05a3de44961529d"}, + {file = "pyarrow-12.0.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:459a1c0ed2d68671188b2118c63bac91eaef6fc150c77ddd8a583e3c795737bf"}, + {file = "pyarrow-12.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:85e705e33eaf666bbe508a16fd5ba27ca061e177916b7a317ba5a51bee43384c"}, + {file = "pyarrow-12.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9120c3eb2b1f6f516a3b7a9714ed860882d9ef98c4b17edcdc91d95b7528db60"}, + {file = "pyarrow-12.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:c780f4dc40460015d80fcd6a6140de80b615349ed68ef9adb653fe351778c9b3"}, + {file = "pyarrow-12.0.1-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:a3c63124fc26bf5f95f508f5d04e1ece8cc23a8b0af2a1e6ab2b1ec3fdc91b24"}, + {file = "pyarrow-12.0.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b13329f79fa4472324f8d32dc1b1216616d09bd1e77cfb13104dec5463632c36"}, + {file = "pyarrow-12.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bb656150d3d12ec1396f6dde542db1675a95c0cc8366d507347b0beed96e87ca"}, + {file = "pyarrow-12.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6251e38470da97a5b2e00de5c6a049149f7b2bd62f12fa5dbb9ac674119ba71a"}, + {file = "pyarrow-12.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:3de26da901216149ce086920547dfff5cd22818c9eab67ebc41e863a5883bac7"}, + {file = "pyarrow-12.0.1.tar.gz", hash = "sha256:cce317fc96e5b71107bf1f9f184d5e54e2bd14bbf3f9a3d62819961f0af86fec"}, +] + +[package.dependencies] +numpy = ">=1.16.6" + +[[package]] +name = "pyspark" +version = "3.4.1" +description = "Apache Spark Python API" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pyspark-3.4.1.tar.gz", hash = "sha256:72cd66ab8cf61a75854e5a753f75bea35ee075c3a96f9de4e2a66d02ec7fc652"}, +] + +[package.dependencies] +googleapis-common-protos = {version = ">=1.56.4", optional = true, markers = "extra == \"connect\""} +grpcio = {version = ">=1.48.1", optional = true, markers = "extra == \"connect\""} +grpcio-status = {version = ">=1.48.1", optional = true, markers = "extra == \"connect\""} +numpy = {version = ">=1.15", optional = true, markers = "extra == \"connect\" or extra == \"sql\""} +pandas = {version = ">=1.0.5", optional = true, markers = "extra == \"connect\" or extra == \"sql\""} +py4j = "0.10.9.7" +pyarrow = {version = ">=1.0.0", optional = true, markers = "extra == \"connect\" or extra == \"sql\""} + +[package.extras] +connect = ["googleapis-common-protos (>=1.56.4)", "grpcio (>=1.48.1)", "grpcio-status (>=1.48.1)", "numpy (>=1.15)", "pandas (>=1.0.5)", "pyarrow (>=1.0.0)"] +ml = ["numpy (>=1.15)"] +mllib = ["numpy (>=1.15)"] +pandas-on-spark = ["numpy (>=1.15)", "pandas (>=1.0.5)", "pyarrow (>=1.0.0)"] +sql = ["numpy (>=1.15)", "pandas (>=1.0.5)", "pyarrow (>=1.0.0)"] + +[[package]] +name = "python-dateutil" +version = "2.8.2" +description = "Extensions to the standard Python datetime module" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +files = [ + {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, + {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, +] + +[package.dependencies] +six = ">=1.5" + +[[package]] +name = "pytz" +version = "2023.3" +description = "World timezone definitions, modern and historical" +optional = false +python-versions = "*" +files = [ + {file = "pytz-2023.3-py2.py3-none-any.whl", hash = "sha256:a151b3abb88eda1d4e34a9814df37de2a80e301e68ba0fd856fb9b46bfbbbffb"}, + {file = "pytz-2023.3.tar.gz", hash = "sha256:1d8ce29db189191fb55338ee6d0387d82ab59f3d00eac103412d64e0ebd0c588"}, +] + +[[package]] +name = "six" +version = "1.16.0" +description = "Python 2 and 3 compatibility utilities" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" +files = [ + {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, + {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, +] + +[[package]] +name = "tzdata" +version = "2023.3" +description = "Provider of IANA time zone data" +optional = false +python-versions = ">=2" +files = [ + {file = "tzdata-2023.3-py2.py3-none-any.whl", hash = "sha256:7e65763eef3120314099b6939b5546db7adce1e7d6f2e179e3df563c70511eda"}, + {file = "tzdata-2023.3.tar.gz", hash = "sha256:11ef1e08e54acb0d4f95bdb1be05da659673de4acbd21bf9c69e94cc5e907a3a"}, +] + +[metadata] +lock-version = "2.0" +python-versions = "^3.11" +content-hash = "419289b35beab928ed8e691e696517bd8b1c093630d90d8d8568c6b9ba1b9f2b" diff --git a/tests/integration/python/pyproject.toml b/tests/integration/python/pyproject.toml new file mode 100644 index 0000000..9f8a0c0 --- /dev/null +++ b/tests/integration/python/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "icelake-integration-tests" +version = "0.0.9" +description = "" +authors = ["Renjie Liu "] +readme = "README.md" +packages = [{include = "icelake_integration_tests"}] + +[tool.poetry.dependencies] +python = "^3.11" +pyspark = { version = "3.4.1", extras = ["sql", "connect"] } + + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/tests/integration/run.sh b/tests/integration/run.sh new file mode 100644 index 0000000..a15797e --- /dev/null +++ b/tests/integration/run.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +set -ex + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +PROJ_DIR="$SCRIPT_DIR/../.." + +cd "$SCRIPT_DIR"/docker + +docker compose up -d --wait spark + +cd "$SCRIPT_DIR"/python +poetry update +poetry run python init.py -s sc://localhost:15002 -f "$SCRIPT_DIR"/testdata/insert1.csv + +"$PROJ_DIR"/target/debug/icelake-integration-tests \ + --s3-bucket icebergdata \ + --s3-endpoint http://localhost:9000 \ + --s3-username admin \ + --s3-password password \ + --s3-region us-east-1 \ + -t "demo/s1/t1" \ + -c "$SCRIPT_DIR"/testdata/insert2.csv + + + +cd "$SCRIPT_DIR"/python +poetry run python check.py -s sc://localhost:15002 -f "$SCRIPT_DIR"/testdata/query1.csv + +cd "$SCRIPT_DIR"/docker +docker compose down -v --remove-orphans diff --git a/tests/integration/rust/Cargo.toml b/tests/integration/rust/Cargo.toml new file mode 100644 index 0000000..1c59b37 --- /dev/null +++ b/tests/integration/rust/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "icelake-integration-tests" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +description = "Pure Rust Iceberg Implementation" + +[dependencies] +icelake = { path = "../../../icelake" } +log = { workspace = true } +env_logger = { workspace = true } +tokio = { workspace = true } +opendal = { workspace = true } +serde = { workspace = true } +csv = { workspace = true } +arrow = { workspace = true } +clap = { workspace = true } diff --git a/tests/integration/rust/src/main.rs b/tests/integration/rust/src/main.rs new file mode 100644 index 0000000..df6814c --- /dev/null +++ b/tests/integration/rust/src/main.rs @@ -0,0 +1,108 @@ +use arrow::csv::ReaderBuilder; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use clap::Parser; +use icelake::transaction::Transaction; +use icelake::Table; +use opendal::services::S3; +use opendal::Operator; +use std::fs::File; +use std::sync::Arc; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + s3_bucket: String, + #[arg(long)] + s3_endpoint: String, + #[arg(long)] + s3_username: String, + #[arg(long)] + s3_password: String, + #[arg(long)] + s3_region: String, + + #[arg(short, long)] + table_path: String, + #[arg(short, long)] + csv_file: String, +} + +struct TestFixture { + args: Args, +} + +impl TestFixture { + async fn write_data_with_icelake(&mut self) { + let mut table = create_icelake_table(&self.args).await; + log::info!( + "Real path of table is: {}", + table.current_table_metadata().location + ); + + let records = read_records_to_arrow(self.args.csv_file.as_str()); + + let mut task_writer = table.task_writer().await.unwrap(); + + for record_batch in &records { + log::info!( + "Insert record batch with {} records using icelake.", + record_batch.num_rows() + ); + task_writer.write(record_batch).await.unwrap(); + } + + let result = task_writer.close().await.unwrap(); + log::debug!("Insert {} data files: {:?}", result.len(), result); + + // Commit table transaction + { + let mut tx = Transaction::new(&mut table); + tx.append_file(result); + tx.commit().await.unwrap(); + } + } +} + +async fn prepare_env() -> TestFixture { + env_logger::init(); + + TestFixture { + args: Args::parse(), + } +} + +async fn create_icelake_table(args: &Args) -> Table { + let mut builder = S3::default(); + builder.root(args.table_path.as_str()); + builder.bucket(args.s3_bucket.as_str()); + builder.endpoint(args.s3_endpoint.as_str()); + builder.access_key_id(args.s3_username.as_str()); + builder.secret_access_key(args.s3_password.as_str()); + builder.region(args.s3_region.as_str()); + + let op = Operator::new(builder).unwrap().finish(); + Table::open_with_op(op).await.unwrap() +} + +fn read_records_to_arrow(path: &str) -> Vec { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + Field::new("distance", DataType::Int64, false), + ]); + + let csv_reader = ReaderBuilder::new(Arc::new(schema)) + .has_header(false) + .build(File::open(path).unwrap()) + .unwrap(); + + csv_reader.map(|r| r.unwrap()).collect::>() +} + +#[tokio::main] +async fn main() { + let mut fixture = prepare_env().await; + + fixture.write_data_with_icelake().await; +} diff --git a/tests/integration/testdata/insert1.csv b/tests/integration/testdata/insert1.csv new file mode 100644 index 0000000..5775ec5 --- /dev/null +++ b/tests/integration/testdata/insert1.csv @@ -0,0 +1,2 @@ +1,abc,1000 +2,def,2000 diff --git a/tests/integration/testdata/insert2.csv b/tests/integration/testdata/insert2.csv new file mode 100644 index 0000000..71af3dc --- /dev/null +++ b/tests/integration/testdata/insert2.csv @@ -0,0 +1,3 @@ +3,abc,3000 +4,def,4000 +5,ghi,5000 diff --git a/tests/integration/testdata/query1.csv b/tests/integration/testdata/query1.csv new file mode 100644 index 0000000..33aa6b8 --- /dev/null +++ b/tests/integration/testdata/query1.csv @@ -0,0 +1,5 @@ +1,abc,1000 +2,def,2000 +3,abc,3000 +4,def,4000 +5,ghi,5000