Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ paimon = { path = "../paimon" }
arrow-array = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"

[dev-dependencies]
serde_json = "1"
axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
164 changes: 135 additions & 29 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@

use arrow_array::{Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use paimon::catalog::Identifier;
use paimon::api::ConfigResponse;
use paimon::catalog::{Identifier, RESTCatalog};
use paimon::common::Options;
use paimon::spec::{DataType, IntType, Schema, VarCharType};
use paimon::{Catalog, Error, FileSystemCatalog, Plan};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

#[path = "../../paimon/tests/mock_server.rs"]
mod mock_server;
use mock_server::start_mock_server;

fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string())
}

async fn scan_and_read(table_name: &str) -> (Plan, Vec<RecordBatch>) {
scan_and_read_with_projection(table_name, None).await
}

async fn scan_and_read_with_projection(
async fn scan_and_read<C: Catalog + ?Sized>(
catalog: &C,
table_name: &str,
projection: Option<&[&str]>,
) -> (Plan, Vec<RecordBatch>) {
let table = get_test_table(table_name).await;
let table = get_table_from_catalog(catalog, table_name).await;

let mut read_builder = table.new_read_builder();
if let Some(cols) = projection {
Expand All @@ -60,6 +64,30 @@ async fn scan_and_read_with_projection(
(plan, batches)
}

async fn get_table_from_catalog<C: Catalog + ?Sized>(
catalog: &C,
table_name: &str,
) -> paimon::Table {
let identifier = Identifier::new("default", table_name);
catalog
.get_table(&identifier)
.await
.expect("Failed to get table")
}

fn create_file_system_catalog() -> FileSystemCatalog {
let warehouse = get_test_warehouse();
FileSystemCatalog::new(warehouse).expect("Failed to create FileSystemCatalog")
}

async fn scan_and_read_with_fs_catalog(
table_name: &str,
projection: Option<&[&str]>,
) -> (Plan, Vec<RecordBatch>) {
let catalog = create_file_system_catalog();
scan_and_read(&catalog, table_name, projection).await
}

fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
let mut rows = Vec::new();
for batch in batches {
Expand All @@ -81,7 +109,7 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {

#[tokio::test]
async fn test_read_log_table() {
let (plan, batches) = scan_and_read("simple_log_table").await;
let (plan, batches) = scan_and_read_with_fs_catalog("simple_log_table", None).await;

// Non-partitioned table: partition should be a valid arity=0 BinaryRow
// deserialized from manifest bytes, not a stub without backing data.
Expand All @@ -105,7 +133,7 @@ async fn test_read_log_table() {

#[tokio::test]
async fn test_read_dv_primary_key_table() {
let (_, batches) = scan_and_read("simple_dv_pk_table").await;
let (_, batches) = scan_and_read_with_fs_catalog("simple_dv_pk_table", None).await;
let actual = extract_id_name(&batches);
let expected = vec![
(1, "alice-v2".to_string()),
Expand All @@ -123,7 +151,7 @@ async fn test_read_dv_primary_key_table() {

#[tokio::test]
async fn test_read_partitioned_log_table() {
let (plan, batches) = scan_and_read("partitioned_log_table").await;
let (plan, batches) = scan_and_read_with_fs_catalog("partitioned_log_table", None).await;

let mut seen_partitions: HashSet<String> = HashSet::new();
for split in plan.splits() {
Expand Down Expand Up @@ -176,7 +204,7 @@ async fn test_read_partitioned_log_table() {

#[tokio::test]
async fn test_read_multi_partitioned_log_table() {
let (plan, batches) = scan_and_read("multi_partitioned_log_table").await;
let (plan, batches) = scan_and_read_with_fs_catalog("multi_partitioned_log_table", None).await;

let mut seen_partitions: HashSet<(String, i32)> = HashSet::new();
for split in plan.splits() {
Expand Down Expand Up @@ -244,7 +272,7 @@ async fn test_read_multi_partitioned_log_table() {

#[tokio::test]
async fn test_read_partitioned_dv_pk_table() {
let (plan, batches) = scan_and_read("partitioned_dv_pk_table").await;
let (plan, batches) = scan_and_read_with_fs_catalog("partitioned_dv_pk_table", None).await;

// Verify partition metadata on each split.
let mut seen_partitions: HashSet<String> = HashSet::new();
Expand Down Expand Up @@ -298,20 +326,10 @@ async fn test_read_partitioned_dv_pk_table() {
);
}

async fn get_test_table(table_name: &str) -> paimon::Table {
let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");
let identifier = Identifier::new("default", table_name);
catalog
.get_table(&identifier)
.await
.expect("Failed to get table")
}

#[tokio::test]
async fn test_read_with_column_projection() {
let (_, batches) =
scan_and_read_with_projection("partitioned_log_table", Some(&["name", "id"])).await;
scan_and_read_with_fs_catalog("partitioned_log_table", Some(&["name", "id"])).await;

// Verify that output schema preserves caller-specified column order.
for batch in &batches {
Expand Down Expand Up @@ -340,7 +358,8 @@ async fn test_read_with_column_projection() {

#[tokio::test]
async fn test_read_projection_empty() {
let table = get_test_table("simple_log_table").await;
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "simple_log_table").await;

let mut read_builder = table.new_read_builder();
read_builder.with_projection(&[]);
Expand Down Expand Up @@ -378,10 +397,10 @@ async fn test_read_projection_empty() {
);
}
}

#[tokio::test]
async fn test_read_projection_unknown_column() {
let table = get_test_table("simple_log_table").await;
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "simple_log_table").await;

let mut read_builder = table.new_read_builder();
read_builder.with_projection(&["id", "nonexistent_column"]);
Expand All @@ -403,7 +422,8 @@ async fn test_read_projection_unknown_column() {

#[tokio::test]
async fn test_read_projection_all_invalid() {
let table = get_test_table("simple_log_table").await;
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "simple_log_table").await;

let mut read_builder = table.new_read_builder();
read_builder.with_projection(&["nonexistent_a", "nonexistent_b"]);
Expand All @@ -425,7 +445,8 @@ async fn test_read_projection_all_invalid() {

#[tokio::test]
async fn test_read_projection_duplicate_column() {
let table = get_test_table("simple_log_table").await;
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "simple_log_table").await;

let mut read_builder = table.new_read_builder();
read_builder.with_projection(&["id", "id"]);
Expand All @@ -438,3 +459,88 @@ async fn test_read_projection_duplicate_column() {
"Expected ConfigInvalid for duplicate projection, got: {err:?}"
);
}

// ======================= REST Catalog read tests ===============================

/// Build a simple test schema matching the Spark-provisioned tables (id INT, name VARCHAR).
fn simple_log_schema() -> Schema {
Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("name", DataType::VarChar(VarCharType::string_type()))
.build()
.expect("Failed to build schema")
}

/// Start a mock REST server backed by Spark-provisioned data on disk,
/// register the given tables, and return a connected `RESTCatalog`.
async fn setup_rest_catalog_with_tables(
table_configs: &[(&str, &str, Schema)],
) -> (mock_server::RESTServer, RESTCatalog) {
let catalog_path = get_test_warehouse();
// Use a simple warehouse name (no slashes) to avoid URL-encoding issues
let warehouse_name = "test_warehouse";
let prefix = "mock-test";
let mut defaults = HashMap::new();
defaults.insert("prefix".to_string(), prefix.to_string());
let config = ConfigResponse::new(defaults);

let server = start_mock_server(
warehouse_name.to_string(),
catalog_path.clone(),
config,
vec!["default".to_string()],
)
.await;

// Register each table with its schema and the real on-disk path
for (database, table_name, schema) in table_configs {
let table_path = format!("{}/{}.db/{}", catalog_path, database, table_name);
server.add_table_with_schema(database, table_name, schema.clone(), &table_path);
}

let url = server.url().expect("Failed to get server URL");
let mut options = Options::new();
options.set("uri", &url);
options.set("warehouse", warehouse_name);
options.set("token.provider", "bear");
options.set("token", "test_token");

let catalog = RESTCatalog::new(options, true)
.await
.expect("Failed to create RESTCatalog");

(server, catalog)
}

/// Test reading an append-only (log) table via REST catalog backed by mock server.
///
/// The mock server returns table metadata pointing to Spark-provisioned data on disk.
#[tokio::test]
async fn test_rest_catalog_read_append_table() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any difference between log table and dv table regarding to rest catalog?
If not, I think we just need to keep only one test.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, i think you can reuse scan_and_read_with_projection. Pass a catalog trait to it. It should works for both filesystem catalog and rest catalog.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

let table_name = "simple_log_table";
let (_server, catalog) =
setup_rest_catalog_with_tables(&[("default", table_name, simple_log_schema())]).await;

let (plan, batches) = scan_and_read(&catalog, table_name, None).await;

assert!(
!plan.splits().is_empty(),
"REST append table should have at least one split"
);

assert!(
!batches.is_empty(),
"REST append table should produce at least one batch"
);

let actual = extract_id_name(&batches);
let expected = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
];
assert_eq!(
actual, expected,
"REST catalog append table rows should match expected values"
);
}
4 changes: 2 additions & 2 deletions crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ license.workspace = true
version.workspace = true

[features]
default = ["storage-memory", "storage-fs"]
default = ["storage-memory", "storage-fs", "storage-oss"]
storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"]

storage-memory = ["opendal/services-memory"]
Expand All @@ -49,7 +49,7 @@ serde_with = "3.9.0"
serde_repr = "0.1"
snafu = "0.8.3"
typed-builder = "^0.19"
opendal = { version = "0.49", features = ["services-fs"] }
opendal = { version = "0.55", features = ["services-fs"] }
pretty_assertions = "1"
apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
indexmap = "2.5.0"
Expand Down
Loading
Loading