Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions crates/paimon/src/deletion_vector/factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#![allow(dead_code)]

use crate::deletion_vector::core::DeletionVector;
use crate::io::{FileIO, FileRead};
use crate::Result;
use std::collections::HashMap;
use std::sync::Arc;

/// Factory for creating DeletionVector instances from files and metadata.
///
/// Corresponds to Java's [DeletionVector.Factory](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java)
/// (create(fileName) -> Optional<DeletionVector>). Can be built from split-level deletion files
/// ([create_from_deletion_files]) or from index manifest entries ([create]).
pub struct DeletionVectorFactory {
/// Map from data file name to its deletion vector
deletion_vectors: HashMap<String, Arc<DeletionVector>>,
}

impl DeletionVectorFactory {
/// Create a DeletionVectorFactory from data file names and their optional deletion files.
/// Same as Java's `DeletionVector.factory(fileIO, files, deletionFiles)`: for each file that
/// has a DeletionFile, reads path/offset/length and loads the DV.
pub async fn new(
file_io: &FileIO,
entries: Vec<(String, Option<crate::DeletionFile>)>,
) -> Result<Self> {
let mut deletion_vectors = HashMap::new();
for (data_file_name, opt_df) in entries {
let df = match &opt_df {
Some(d) => d,
_ => continue,
};
let dv = Self::read(file_io, df).await?;
deletion_vectors.insert(data_file_name, Arc::new(dv));
}
Ok(DeletionVectorFactory { deletion_vectors })
}

/// Get the deletion vector for a specific data file
pub fn get_deletion_vector(&self, data_file_name: &str) -> Option<Arc<DeletionVector>> {
self.deletion_vectors.get(data_file_name).cloned()
}

/// Read a single DeletionVector from storage using DeletionFile (path/offset/length).
/// Same as Java's DeletionVector.read(FileIO, DeletionFile).
async fn read(file_io: &FileIO, df: &crate::DeletionFile) -> Result<DeletionVector> {
let input = file_io.new_input(df.path())?;
let reader = input.reader().await?;
let offset = df.offset() as u64;
let len = df.length() as u64;
let bytes = reader.read(offset..offset.saturating_add(len)).await?;
Comment on lines +60 to +68

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_from_bytes expects len + 8 bytes: 4 bytes (bitmapLength) + len bytes (magic + bitmap data) + 4 bytes (CRC). Should be offset.saturating_add(len + 8) here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        Path path = new Path(deletionFile.path());
        try (SeekableInputStream input = fileIO.newInputStream(path)) {
            input.seek(deletionFile.offset());
            DataInputStream dis = new DataInputStream(input);
            return read(dis, deletionFile.length());
        }
    }

But according to this java code, it should be fine to use deletion file length directly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        Path path = new Path(deletionFile.path());
        try (SeekableInputStream input = fileIO.newInputStream(path)) {
            input.seek(deletionFile.offset());
            DataInputStream dis = new DataInputStream(input);
            return read(dis, deletionFile.length());
        }
    }

But according to this java code, it should be fine to use deletion file length directly.

let bytes = &std::fs::read(&path).unwrap()[1..25];  // do not read the entire file here 
DeletionVector::read_from_bytes(&bytes, Some(24)) 

You can try to test this to check whether it works well. Should be ok, if works.

Copy link
Contributor Author

@luoyuxia luoyuxia Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already verified the dv table reading with my final pr #100. I believe it should be fine.

DeletionVector::read_from_bytes(&bytes, Some(len))
}
}
1 change: 1 addition & 0 deletions crates/paimon/src/deletion_vector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
// under the License.

mod core;
mod factory;
12 changes: 8 additions & 4 deletions crates/paimon/src/file_index/file_index_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,16 @@ impl FileIndex {
&self,
index_info: &IndexInfo,
) -> crate::Result<Bytes> {
let data_bytes = self
.reader
self.reader
.read(index_info.start_pos as u64..(index_info.start_pos + index_info.length) as u64)
.await?;
.await
}

Ok(data_bytes)
/// Read bytes from the index file at the specified position and length
pub async fn read_bytes(&self, start: i64, length: i64) -> crate::Result<Bytes> {
self.reader
.read(start as u64..(start + length) as u64)
.await
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/paimon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ pub use catalog::Catalog;
pub use catalog::FileSystemCatalog;

pub use table::{
DataSplit, DataSplitBuilder, Plan, ReadBuilder, SnapshotManager, Table, TableRead, TableScan,
DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, SnapshotManager,
Table, TableRead, TableScan,
};
33 changes: 33 additions & 0 deletions crates/paimon/src/spec/index_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use crate::io::FileIO;
use crate::spec::manifest_common::FileKind;
use crate::spec::IndexFileMeta;
use apache_avro::types::Value;
use apache_avro::{from_value, Reader};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

use crate::Error;
use crate::Result;

/// Manifest entry for index file.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java>
Expand Down Expand Up @@ -52,6 +58,33 @@ impl Display for IndexManifestEntry {
}
}

/// Index manifest file reader (entries describing index files per partition/bucket).
///
/// Reference: [org.apache.paimon.index.IndexFileHandler](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java)
pub struct IndexManifest;

impl IndexManifest {
/// Read index manifest entries from a file.
pub async fn read(file_io: &FileIO, path: &str) -> Result<Vec<IndexManifestEntry>> {
let input_file = file_io.new_input(path)?;
if !input_file.exists().await? {
return Ok(Vec::new());
}
let content = input_file.read().await?;
Self::read_from_bytes(&content)
}

/// Read index manifest entries from Avro-encoded bytes.
pub fn read_from_bytes(bytes: &[u8]) -> Result<Vec<IndexManifestEntry>> {
let reader = Reader::new(bytes).map_err(Error::from)?;
let records = reader
.collect::<std::result::Result<Vec<Value>, _>>()
.map_err(Error::from)?;
let values = Value::Array(records);
from_value::<Vec<IndexManifestEntry>>(&values).map_err(Error::from)
}
}

#[cfg(test)]
mod tests {
use indexmap::IndexMap;
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod index_file_meta;
pub use index_file_meta::*;

mod index_manifest;
pub use index_manifest::{IndexManifest, IndexManifestEntry};
mod manifest;
pub use manifest::Manifest;
mod manifest_common;
Expand Down
2 changes: 1 addition & 1 deletion crates/paimon/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow_array::RecordBatch;
use futures::stream::BoxStream;
pub use read_builder::{ReadBuilder, TableRead};
pub use snapshot_manager::SnapshotManager;
pub use source::{DataSplit, DataSplitBuilder, Plan};
pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan};
pub use table_scan::TableScan;

use crate::catalog::Identifier;
Expand Down
117 changes: 115 additions & 2 deletions crates/paimon/src/table/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,86 @@
// specific language governing permissions and limitations
// under the License.

//! Table source types: DataSplit, Plan, and related structs.
//! Table source types: DataSplit, Plan, DeletionFile, and related structs.
//!
//! Reference: [org.apache.paimon.table.source](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/).

#![allow(dead_code)]

use crate::spec::{BinaryRow, DataFileMeta};
use serde::{Deserialize, Serialize};

// ======================= DeletionFile ===============================

/// Deletion file for a data file: describes a region in a file that stores deletion vector bitmap.
///
/// Format of the region (first 4 bytes length, then magic, then RoaringBitmap content):
/// - First 4 bytes: length (should equal [Self::length]).
/// - Next 4 bytes: magic number (1581511376).
/// - Remaining: serialized RoaringBitmap.
///
/// Reference: [org.apache.paimon.table.source.DeletionFile](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java)
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DeletionFile {
/// Path of the file containing the deletion vector (e.g. index file path).
path: String,
/// Starting offset of the deletion vector data in the file.
offset: i64,
/// Length in bytes of the deletion vector data.
length: i64,
/// Number of deleted rows (cardinality of the bitmap), if known.
cardinality: Option<i64>,
}

impl DeletionFile {
pub fn new(path: String, offset: i64, length: i64, cardinality: Option<i64>) -> Self {
Self {
path,
offset,
length,
cardinality,
}
}

/// Path of the file.
pub fn path(&self) -> &str {
&self.path
}

/// Starting offset of data in the file.
pub fn offset(&self) -> i64 {
self.offset
}

/// Length of data in the file.
pub fn length(&self) -> i64 {
self.length
}

/// Number of deleted rows, if known.
pub fn cardinality(&self) -> Option<i64> {
self.cardinality
}
}

// ======================= PartitionBucket ===============================

/// Key for grouping splits by partition and bucket: (partition bytes, bucket id).
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PartitionBucket {
pub partition: Vec<u8>,
pub bucket: i32,
}

impl PartitionBucket {
pub fn new(partition: Vec<u8>, bucket: i32) -> Self {
Self { partition, bucket }
}
}

// ======================= DataSplit ===============================

/// Input split for reading: partition + bucket + list of data files.
/// Input split for reading: partition + bucket + list of data files and optional deletion files.
///
/// Reference: [org.apache.paimon.table.source.DataSplit](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java)
#[derive(Debug)]
Expand All @@ -36,6 +105,9 @@ pub struct DataSplit {
bucket_path: String,
total_buckets: i32,
data_files: Vec<DataFileMeta>,
/// Deletion file for each data file, same order as `data_files`.
/// `None` at index `i` means no deletion file for `data_files[i]` (matches Java getDeletionFiles() / List<DeletionFile> with null elements).
data_deletion_files: Option<Vec<Option<DeletionFile>>>,
}

impl DataSplit {
Expand All @@ -59,6 +131,34 @@ impl DataSplit {
&self.data_files
}

/// Deletion files for each data file (same order as `data_files`); `None` = no deletion file for that data file.
pub fn data_deletion_files(&self) -> Option<&[Option<DeletionFile>]> {
self.data_deletion_files.as_deref()
}

/// Returns the deletion file for the data file at the given index, if any. `None` at that index means no deletion file.
pub fn deletion_file_for_data_file_index(&self, index: usize) -> Option<&DeletionFile> {
self.data_deletion_files
.as_deref()?
.get(index)
.and_then(Option::as_ref)
}

/// Returns the deletion file for the given data file (by file name), if any.
pub fn deletion_file_for_data_file(&self, file: &DataFileMeta) -> Option<&DeletionFile> {
let index = self
.data_files
.iter()
.position(|f| f.file_name == file.file_name)?;
self.deletion_file_for_data_file_index(index)
}

/// Full path for a single data file in this split (bucket_path + file_name).
pub fn data_file_path(&self, file: &DataFileMeta) -> String {
let base = self.bucket_path.trim_end_matches('/');
format!("{}/{}", base, file.file_name)
}

/// Iterate over each data file in this split, yielding `(path, &DataFileMeta)`.
/// Use this to read each data file one by one (e.g. in ArrowReader).
pub fn data_file_entries(&self) -> impl Iterator<Item = (String, &DataFileMeta)> + '_ {
Expand Down Expand Up @@ -92,6 +192,8 @@ pub struct DataSplitBuilder {
bucket_path: Option<String>,
total_buckets: i32,
data_files: Option<Vec<DataFileMeta>>,
/// Same length as data_files; `None` at index i = no deletion file for data_files[i].
data_deletion_files: Option<Vec<Option<DeletionFile>>>,
}

impl DataSplitBuilder {
Expand All @@ -103,6 +205,7 @@ impl DataSplitBuilder {
bucket_path: None,
total_buckets: -1,
data_files: None,
data_deletion_files: None,
}
}

Expand Down Expand Up @@ -131,6 +234,15 @@ impl DataSplitBuilder {
self
}

/// Sets deletion files; length must match data_files. Use `None` at index i when data_files[i] has no deletion file.
pub fn with_data_deletion_files(
mut self,
data_deletion_files: Vec<Option<DeletionFile>>,
) -> Self {
self.data_deletion_files = Some(data_deletion_files);
self
}

pub fn build(self) -> crate::Result<DataSplit> {
if self.snapshot_id == -1 {
return Err(crate::Error::UnexpectedError {
Expand Down Expand Up @@ -169,6 +281,7 @@ impl DataSplitBuilder {
bucket_path,
total_buckets: self.total_buckets,
data_files,
data_deletion_files: self.data_deletion_files,
})
}
}
Expand Down
Loading
Loading