Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,5 @@ rust-version = "1.86.0"
arrow-array = { version = "57.0", features = ["ffi"] }
arrow-schema = "57.0"
arrow-cast = "57.0"
arrow-select = "57.0"
parquet = "57.0"
tokio = "1.39.2"
5 changes: 2 additions & 3 deletions crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,12 @@ snafu = "0.9.0"
typed-builder = "^0.19"
opendal = { version = "0.55", features = ["services-fs"] }
pretty_assertions = "1"
apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
serde_avro_fast = { version = "2.0.2", features = ["snappy", "zstandard"] }
indexmap = "2.5.0"
roaring = "0.11"
arrow-array = { workspace = true }
arrow-cast = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
futures = "0.3"
parquet = { workspace = true, features = ["async", "zstd"] }
async-stream = "0.3.6"
Expand All @@ -76,5 +75,5 @@ urlencoding = "2.1"
[dev-dependencies]
axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
rand = "0.8.5"
serde_avro_fast = { version = "2.0.2", features = ["snappy"] }

tempfile = "3"
17 changes: 0 additions & 17 deletions crates/paimon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,6 @@ pub enum Error {
display("Paimon hitting invalid config: {}", message)
)]
ConfigInvalid { message: String },
#[snafu(
visibility(pub(crate)),
display("Paimon hitting unexpected avro error {}: {:?}", message, source)
)]
DataUnexpected {
message: String,
source: Box<apache_avro::Error>,
},
#[snafu(
visibility(pub(crate)),
display("Paimon hitting invalid file index format: {}", message)
Expand Down Expand Up @@ -127,15 +119,6 @@ impl From<opendal::Error> for Error {
}
}

impl From<apache_avro::Error> for Error {
fn from(source: apache_avro::Error) -> Self {
Error::DataUnexpected {
message: "".to_string(),
source: Box::new(source),
}
}
}

impl From<parquet::errors::ParquetError> for Error {
fn from(source: parquet::errors::ParquetError) -> Self {
Error::ParquetDataUnexpected {
Expand Down
8 changes: 8 additions & 0 deletions crates/paimon/src/spec/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,14 @@ pub struct DataFileMeta {
skip_serializing_if = "Option::is_none"
)]
pub write_cols: Option<Vec<String>>,

/// External path for the data file (e.g. when data is stored outside the table directory).
#[serde(
rename = "_EXTERNAL_PATH",
default,
skip_serializing_if = "Option::is_none"
)]
pub external_path: Option<String>,
}

impl Display for DataFileMeta {
Expand Down
17 changes: 8 additions & 9 deletions crates/paimon/src/spec/index_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
use crate::io::FileIO;
use crate::spec::manifest_common::FileKind;
use crate::spec::IndexFileMeta;
use apache_avro::types::Value;
use apache_avro::{from_value, Reader};
use serde::{Deserialize, Serialize};
use serde_avro_fast::object_container_file_encoding::Reader;
use snafu::ResultExt;
use std::fmt::{Display, Formatter};

use crate::Error;
use crate::Result;

/// Manifest entry for index file.
Expand Down Expand Up @@ -76,12 +75,12 @@ impl IndexManifest {

/// Read index manifest entries from Avro-encoded bytes.
pub fn read_from_bytes(bytes: &[u8]) -> Result<Vec<IndexManifestEntry>> {
let reader = Reader::new(bytes).map_err(Error::from)?;
let records = reader
.collect::<std::result::Result<Vec<Value>, _>>()
.map_err(Error::from)?;
let values = Value::Array(records);
from_value::<Vec<IndexManifestEntry>>(&values).map_err(Error::from)
let mut reader = Reader::from_slice(bytes)
.whatever_context::<_, crate::Error>("read index manifest avro")?;
reader
.deserialize::<IndexManifestEntry>()
.collect::<std::result::Result<Vec<_>, _>>()
.whatever_context::<_, crate::Error>("deserialize index manifest entry")
}
}

Expand Down
17 changes: 8 additions & 9 deletions crates/paimon/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

use crate::io::FileIO;
use crate::spec::manifest_entry::ManifestEntry;
use apache_avro::types::Value;
use apache_avro::{from_value, Reader};
use serde_avro_fast::object_container_file_encoding::Reader;
use snafu::ResultExt;

use crate::Error;
use crate::Result;

/// Manifest file reader and writer.
Expand Down Expand Up @@ -60,12 +59,12 @@ impl Manifest {
/// # Returns
/// A vector of ManifestEntry records
fn read_from_bytes(bytes: &[u8]) -> Result<Vec<ManifestEntry>> {
let reader = Reader::new(bytes).map_err(Error::from)?;
let records = reader
.collect::<std::result::Result<Vec<Value>, _>>()
.map_err(Error::from)?;
let values = Value::Array(records);
from_value::<Vec<ManifestEntry>>(&values).map_err(Error::from)
let mut reader =
Reader::from_slice(bytes).whatever_context::<_, crate::Error>("read manifest avro")?;
reader
.deserialize::<ManifestEntry>()
.collect::<std::result::Result<Vec<_>, _>>()
.whatever_context::<_, crate::Error>("deserialize manifest entry")
}
}

Expand Down
6 changes: 6 additions & 0 deletions crates/paimon/src/spec/manifest_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub struct Identifier {
pub bucket: i32,
pub level: i32,
pub file_name: String,
pub extra_files: Vec<String>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Identifier must be aligned with Java, otherwise there will be exceptions.

pub embedded_index: Option<Vec<u8>>,
pub external_path: Option<String>,
}

/// Entry of a manifest file, representing an addition / deletion of a data file.
Expand Down Expand Up @@ -90,6 +93,9 @@ impl ManifestEntry {
bucket: self.bucket,
level: self.file.level,
file_name: self.file.file_name.clone(),
extra_files: self.file.extra_files.clone(),
embedded_index: self.file.embedded_index.clone(),
external_path: self.file.external_path.clone(),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub use manifest::Manifest;
mod manifest_common;
pub use manifest_common::FileKind;
mod manifest_entry;
pub use manifest_entry::Identifier;
pub use manifest_entry::ManifestEntry;
mod objects_file;
pub use objects_file::from_avro_bytes;
Expand Down
19 changes: 10 additions & 9 deletions crates/paimon/src/spec/objects_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use crate::Error;
use apache_avro::types::Value;
use apache_avro::{from_value, Reader};
use serde::de::DeserializeOwned;
use serde_avro_fast::object_container_file_encoding::Reader;
use snafu::ResultExt;

#[allow(dead_code)]
pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> crate::Result<Vec<T>> {
let reader = Reader::new(bytes).map_err(Error::from)?;
let records = reader
.collect::<Result<Vec<Value>, _>>()
.map_err(Error::from)?;
let values = Value::Array(records);
from_value::<Vec<T>>(&values).map_err(Error::from)
let mut reader = Reader::from_slice(bytes)
.whatever_context::<_, crate::Error>("read avro object container")?;
reader
.deserialize::<T>()
.collect::<std::result::Result<Vec<_>, _>>()
.whatever_context::<_, crate::Error>("deserialize avro records")
}

#[cfg(test)]
Expand Down Expand Up @@ -122,6 +121,7 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
},
2
),
Expand Down Expand Up @@ -158,6 +158,7 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
},
2
),
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/table/bin_pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
}
}

Expand Down
94 changes: 51 additions & 43 deletions crates/paimon/src/table/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionB
use crate::table::SnapshotManager;
use crate::table::TagManager;
use crate::Error;
use futures::{StreamExt, TryStreamExt};
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -74,13 +75,17 @@ async fn read_all_manifest_entries(
manifest_files.extend(delta);

let manifest_path_prefix = format!("{}/{}", table_path.trim_end_matches('/'), MANIFEST_DIR);
let mut all_entries = Vec::new();
// todo: consider use multiple-threads read manifest
for meta in manifest_files {
let path = format!("{}/{}", manifest_path_prefix, meta.file_name());
let entries = crate::spec::Manifest::read(file_io, &path).await?;
all_entries.extend(entries);
}
let all_entries: Vec<ManifestEntry> = futures::stream::iter(manifest_files)
.map(|meta| {
let path = format!("{}/{}", manifest_path_prefix, meta.file_name());
async move { crate::spec::Manifest::read(file_io, &path).await }
})
.buffered(64)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.collect();
Ok(all_entries)
}

Expand Down Expand Up @@ -953,7 +958,44 @@ mod tests {
};
use crate::table::source::DeletionFile;
use crate::Error;
use chrono::Utc;
use chrono::{DateTime, Utc};

/// Helper to build a DataFileMeta with data evolution fields.
fn make_evo_file(
name: &str,
file_size: i64,
row_count: i64,
max_seq: i64,
first_row_id: Option<i64>,
) -> DataFileMeta {
DataFileMeta {
file_name: name.to_string(),
file_size,
row_count,
min_key: Vec::new(),
max_key: Vec::new(),
key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()),
value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()),
min_sequence_number: 0,
max_sequence_number: max_seq,
schema_id: 0,
level: 0,
extra_files: Vec::new(),
creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
delete_row_count: None,
embedded_index: None,
first_row_id,
write_cols: None,
external_path: None,
}
}

fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
groups
.iter()
.map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
.collect()
}

struct SerializedBinaryRowBuilder {
arity: i32,
Expand Down Expand Up @@ -1106,44 +1148,10 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
}
}

fn make_evo_file(
name: &str,
file_size: i64,
row_count: i64,
max_seq: i64,
first_row_id: Option<i64>,
) -> DataFileMeta {
DataFileMeta {
file_name: name.to_string(),
file_size,
row_count,
min_key: Vec::new(),
max_key: Vec::new(),
key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()),
value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()),
min_sequence_number: max_seq,
max_sequence_number: max_seq,
schema_id: 0,
level: 0,
extra_files: Vec::new(),
creation_time: Utc::now(),
delete_row_count: None,
embedded_index: None,
first_row_id,
write_cols: None,
}
}

fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
groups
.iter()
.map(|group| group.iter().map(|file| file.file_name.as_str()).collect())
.collect()
}

#[test]
fn test_partition_matches_predicate_decode_failure_fails_open() {
let predicate = PredicateBuilder::new(&partition_string_field())
Expand Down
Loading