Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
704 changes: 704 additions & 0 deletions crates/integrations/datafusion/tests/blob_tests.rs

Large diffs are not rendered by default.

318 changes: 294 additions & 24 deletions crates/paimon/src/arrow/format/blob.rs

Large diffs are not rendered by default.

17 changes: 14 additions & 3 deletions crates/paimon/src/arrow/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

mod avro;
mod blob;
pub(crate) mod blob;
mod orc;
mod parquet;
#[cfg(feature = "vortex")]
Expand Down Expand Up @@ -88,12 +88,18 @@ pub(crate) trait FormatFileWriter: Send {
}

/// Create a format reader based on the file extension.
pub(crate) fn create_format_reader(path: &str) -> crate::Result<Box<dyn FormatFileReader>> {
pub(crate) fn create_format_reader(
path: &str,
blob_as_descriptor: bool,
) -> crate::Result<Box<dyn FormatFileReader>> {
let lower = path.to_ascii_lowercase();
if lower.ends_with(".parquet") {
Ok(Box::new(parquet::ParquetFormatReader))
} else if lower.ends_with(".blob") {
Ok(Box::new(blob::BlobFormatReader))
Ok(Box::new(blob::BlobFormatReader::new(
path.to_string(),
blob_as_descriptor,
)))
} else if lower.ends_with(".orc") {
Ok(Box::new(orc::OrcFormatReader))
} else if lower.ends_with(".avro") {
Expand All @@ -117,13 +123,18 @@ pub(crate) async fn create_format_writer(
schema: SchemaRef,
compression: &str,
zstd_level: i32,
file_io: Option<crate::io::FileIO>,
) -> crate::Result<Box<dyn FormatFileWriter>> {
let path = output.location();
let lower = path.to_ascii_lowercase();
if lower.ends_with(".parquet") {
Ok(Box::new(
parquet::ParquetFormatWriter::new(output, schema, compression, zstd_level).await?,
))
} else if lower.ends_with(".blob") {
Ok(Box::new(
blob::BlobFormatWriter::new(output, file_io).await?,
))
} else {
#[cfg(feature = "vortex")]
if lower.ends_with(".vortex") {
Expand Down
201 changes: 201 additions & 0 deletions crates/paimon/src/spec/blob_descriptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::Error;

const CURRENT_VERSION: u8 = 2;
const MAGIC: u64 = 0x424C4F4244455343; // "BLOBDESC"

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlobDescriptor {
version: u8,
uri: String,
offset: i64,
length: i64,
}

impl BlobDescriptor {
pub fn new(uri: String, offset: i64, length: i64) -> Self {
Self {
version: CURRENT_VERSION,
uri,
offset,
length,
}
}

pub fn uri(&self) -> &str {
&self.uri
}

pub fn offset(&self) -> i64 {
self.offset
}

pub fn length(&self) -> i64 {
self.length
}

pub fn serialize(&self) -> Vec<u8> {
let uri_bytes = self.uri.as_bytes();
let uri_length = uri_bytes.len();
let total_size = 1 + 8 + 4 + uri_length + 8 + 8;
let mut buf = Vec::with_capacity(total_size);

buf.push(self.version);
buf.extend_from_slice(&MAGIC.to_le_bytes());
buf.extend_from_slice(&(uri_length as i32).to_le_bytes());
buf.extend_from_slice(uri_bytes);
buf.extend_from_slice(&self.offset.to_le_bytes());
buf.extend_from_slice(&self.length.to_le_bytes());

buf
}

pub fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
if bytes.len() < 1 + 8 + 4 {
return Err(Error::DataInvalid {
message: "BlobDescriptor bytes too short".to_string(),
source: None,
});
}

let version = bytes[0];
if version > CURRENT_VERSION {
return Err(Error::Unsupported {
message: format!(
"Expecting BlobDescriptor version <= {CURRENT_VERSION}, but found {version}"
),
});
}

let mut pos = 1;

if version > 1 {
let magic = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
if magic != MAGIC {
return Err(Error::DataInvalid {
message: format!(
"Invalid BlobDescriptor: missing magic header. Expected {MAGIC:#X}, found {magic:#X}"
),
source: None,
});
}
pos += 8;
}

if bytes.len() < pos + 4 {
return Err(Error::DataInvalid {
message: "BlobDescriptor bytes too short for uri_length".to_string(),
source: None,
});
}
let uri_length_raw = i32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
if uri_length_raw < 0 {
return Err(Error::DataInvalid {
message: format!("BlobDescriptor has negative uri_length: {uri_length_raw}"),
source: None,
});
}
let uri_length = uri_length_raw as usize;
pos += 4;

if bytes.len() < pos + uri_length + 16 {
return Err(Error::DataInvalid {
message: "BlobDescriptor bytes too short for uri + offset + length".to_string(),
source: None,
});
}
let uri = String::from_utf8(bytes[pos..pos + uri_length].to_vec()).map_err(|e| {
Error::DataInvalid {
message: format!("Invalid UTF-8 in BlobDescriptor uri: {e}"),
source: Some(Box::new(e)),
}
})?;
pos += uri_length;

let offset = i64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
pos += 8;
let length = i64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());

Ok(Self {
version,
uri,
offset,
length,
})
}

pub fn is_blob_descriptor(bytes: &[u8]) -> bool {
if bytes.len() < 9 {
return false;
}
let version = bytes[0];
if version > CURRENT_VERSION {
return false;
}
let magic = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
magic == MAGIC
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_serialize_deserialize_roundtrip() {
let desc = BlobDescriptor::new("s3://bucket/path/to/blob.blob".to_string(), 100, 2048);
let bytes = desc.serialize();
let deserialized = BlobDescriptor::deserialize(&bytes).unwrap();
assert_eq!(desc, deserialized);
}

#[test]
fn test_is_blob_descriptor() {
let desc = BlobDescriptor::new("file:///tmp/test.blob".to_string(), 0, 1024);
let bytes = desc.serialize();
assert!(BlobDescriptor::is_blob_descriptor(&bytes));
assert!(!BlobDescriptor::is_blob_descriptor(&[0u8; 5]));
assert!(!BlobDescriptor::is_blob_descriptor(b"not a descriptor"));
}

#[test]
fn test_deserialize_rejects_future_version() {
let mut bytes = BlobDescriptor::new("x".to_string(), 0, 0).serialize();
bytes[0] = 255;
assert!(BlobDescriptor::deserialize(&bytes).is_err());
}

#[test]
fn test_deserialize_rejects_negative_uri_length() {
let mut bytes = BlobDescriptor::new("x".to_string(), 0, 0).serialize();
// Overwrite uri_length (at offset 9, after version + magic) with -1
bytes[9..13].copy_from_slice(&(-1_i32).to_le_bytes());
let err = BlobDescriptor::deserialize(&bytes).unwrap_err();
assert!(
matches!(err, Error::DataInvalid { message, .. } if message.contains("negative uri_length"))
);
}

#[test]
fn test_deserialize_rejects_bad_magic() {
let mut bytes = BlobDescriptor::new("x".to_string(), 0, 0).serialize();
bytes[1] = 0xFF;
assert!(BlobDescriptor::deserialize(&bytes).is_err());
}
}
29 changes: 28 additions & 1 deletion crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
const DATA_EVOLUTION_ENABLED_OPTION: &str = "data-evolution.enabled";
Expand Down Expand Up @@ -59,6 +59,8 @@ const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = "dynamic-bucket.target-row-num";
const DEFAULT_DYNAMIC_BUCKET_TARGET_ROW_NUM: i64 = 200_000;
const BLOB_AS_DESCRIPTOR_OPTION: &str = "blob-as-descriptor";
const BLOB_DESCRIPTOR_FIELD_OPTION: &str = "blob-descriptor-field";

/// Merge engine for primary-key tables.
///
Expand Down Expand Up @@ -324,6 +326,13 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(DEFAULT_TARGET_FILE_SIZE)
}

pub fn blob_target_file_size(&self) -> i64 {
self.options
.get("blob.target-file-size")
.and_then(|v| parse_memory_size(v))
.unwrap_or_else(|| self.target_file_size())
}

/// File format for data files (e.g. "parquet", "orc", "avro", "vortex").
/// Default is "parquet".
pub fn file_format(&self) -> &str {
Expand Down Expand Up @@ -369,6 +378,24 @@ impl<'a> CoreOptions<'a> {
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_DYNAMIC_BUCKET_TARGET_ROW_NUM)
}

/// When true, blob field reads return serialized BlobDescriptor bytes
/// instead of actual blob bytes. Default is false.
pub fn blob_as_descriptor(&self) -> bool {
self.options
.get(BLOB_AS_DESCRIPTOR_OPTION)
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

/// Comma-separated BLOB field names stored as serialized BlobDescriptor
/// bytes inline in normal data files (no .blob files for these fields).
pub fn blob_descriptor_fields(&self) -> HashSet<String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blob-descriptor-field needs schema-level validation before we use this set. In Java, every configured field must exist and must be a top-level BLOB field. Here we only parse the option string, so typos or nested / non-BLOB fields are silently accepted and we diverge from Java behavior. Can we validate this during schema construction / table initialization instead of letting it flow into the writer path?

self.options
.get(BLOB_DESCRIPTOR_FIELD_OPTION)
.map(|s| s.split(',').map(|f| f.trim().to_string()).collect())
.unwrap_or_default()
}
}

/// Parse a memory size string to bytes using binary (1024-based) semantics.
Expand Down
3 changes: 3 additions & 0 deletions crates/paimon/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
mod binary_row;
pub use binary_row::*;

mod blob_descriptor;
pub use blob_descriptor::BlobDescriptor;

mod data_file;
pub use data_file::*;

Expand Down
14 changes: 2 additions & 12 deletions crates/paimon/src/spec/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,8 @@ impl DataType {
}
}

/// Returns whether this type is or contains (recursively) a [`BlobType`].
pub fn contains_blob_type(&self) -> bool {
match self {
DataType::Blob(_) => true,
DataType::Array(v) => v.element_type.contains_blob_type(),
DataType::Map(v) => {
v.key_type.contains_blob_type() || v.value_type.contains_blob_type()
}
DataType::Multiset(v) => v.element_type.contains_blob_type(),
DataType::Row(v) => v.fields.iter().any(|f| f.data_type().contains_blob_type()),
_ => false,
}
pub fn is_blob_type(&self) -> bool {
matches!(self, DataType::Blob(_))
}

/// Returns whether this type is nullable.
Expand Down
Loading
Loading