diff --git a/bindings/c/src/catalog.rs b/bindings/c/src/catalog.rs index 0a9f825d..9c5c683b 100644 --- a/bindings/c/src/catalog.rs +++ b/bindings/c/src/catalog.rs @@ -15,32 +15,56 @@ // specific language governing permissions and limitations // under the License. -use std::ffi::{c_char, c_void}; +use std::ffi::c_void; +use std::sync::Arc; use paimon::catalog::Identifier; -use paimon::{Catalog, FileSystemCatalog}; +use paimon::{Catalog, CatalogFactory, Options}; use crate::error::{check_non_null, paimon_error, validate_cstr}; use crate::result::{paimon_result_catalog_new, paimon_result_get_table}; use crate::runtime; -use crate::types::{paimon_catalog, paimon_table}; +use crate::types::{paimon_catalog, paimon_option, paimon_table}; -/// Create a new FileSystemCatalog. +/// Create a catalog using CatalogFactory with the given options. /// /// # Safety -/// `warehouse` must be a valid null-terminated C string, or null (returns error). +/// `options` must be a valid pointer to an array of `paimon_option` with `options_len` elements. +/// Each key and value in the options must be valid null-terminated C strings. #[no_mangle] -pub unsafe extern "C" fn paimon_catalog_new(warehouse: *const c_char) -> paimon_result_catalog_new { - let warehouse_str = match validate_cstr(warehouse, "warehouse") { - Ok(s) => s, - Err(e) => { - return paimon_result_catalog_new { - catalog: std::ptr::null_mut(), - error: e, - } +pub unsafe extern "C" fn paimon_catalog_create( + options: *const paimon_option, + options_len: usize, +) -> paimon_result_catalog_new { + // Build Options from the array + let mut opts = Options::new(); + if !options.is_null() && options_len > 0 { + let options_slice = std::slice::from_raw_parts(options, options_len); + for opt in options_slice { + let key = match validate_cstr(opt.key, "option key") { + Ok(s) => s, + Err(e) => { + return paimon_result_catalog_new { + catalog: std::ptr::null_mut(), + error: e, + } + } + }; + let value = match validate_cstr(opt.value, "option value") { + Ok(s) => s, + Err(e) => { + return paimon_result_catalog_new { + catalog: std::ptr::null_mut(), + error: e, + } + } + }; + opts.set(key, value); } - }; - match FileSystemCatalog::new(warehouse_str) { + } + + // Create catalog using CatalogFactory + match runtime().block_on(CatalogFactory::create(opts)) { Ok(catalog) => { let wrapper = Box::new(paimon_catalog { inner: Box::into_raw(Box::new(catalog)) as *mut c_void, @@ -60,13 +84,13 @@ pub unsafe extern "C" fn paimon_catalog_new(warehouse: *const c_char) -> paimon_ /// Free a paimon_catalog. /// /// # Safety -/// Only call with a catalog returned from `paimon_catalog_new`. +/// Only call with a catalog returned from `paimon_catalog_create`. #[no_mangle] pub unsafe extern "C" fn paimon_catalog_free(catalog: *mut paimon_catalog) { if !catalog.is_null() { let c = Box::from_raw(catalog); if !c.inner.is_null() { - drop(Box::from_raw(c.inner as *mut FileSystemCatalog)); + drop(Box::from_raw(c.inner as *mut Arc)); } } } @@ -93,7 +117,7 @@ pub unsafe extern "C" fn paimon_catalog_get_table( }; } - let catalog_ref = &*((*catalog).inner as *const FileSystemCatalog); + let catalog_ref = &*((*catalog).inner as *const Arc); let identifier_ref = &*((*identifier).inner as *const Identifier); match runtime().block_on(catalog_ref.get_table(identifier_ref)) { diff --git a/bindings/c/src/types.rs b/bindings/c/src/types.rs index a95f9654..6f090428 100644 --- a/bindings/c/src/types.rs +++ b/bindings/c/src/types.rs @@ -20,6 +20,13 @@ use std::ffi::c_void; use paimon::spec::DataField; use paimon::table::Table; +/// C-compatible key-value pair for options. +#[repr(C)] +pub struct paimon_option { + pub key: *const std::ffi::c_char, + pub value: *const std::ffi::c_char, +} + /// C-compatible byte buffer. #[repr(C)] #[derive(Clone, Copy)] diff --git a/bindings/go/catalog.go b/bindings/go/catalog.go index b82035d3..9ebba9db 100644 --- a/bindings/go/catalog.go +++ b/bindings/go/catalog.go @@ -27,7 +27,7 @@ import ( "github.com/jupiterrider/ffi" ) -// Catalog wraps a paimon FileSystemCatalog. +// Catalog wraps a paimon Catalog. type Catalog struct { ctx context.Context lib *libRef @@ -35,14 +35,22 @@ type Catalog struct { closeOnce sync.Once } -// NewFileSystemCatalog creates a new FileSystemCatalog for the given warehouse path. -func NewFileSystemCatalog(warehouse string) (*Catalog, error) { +// NewCatalog creates a new Catalog using the CatalogFactory with the given options. +// The catalog type is determined by the "metastore" option (default: "filesystem"). +// +// Common options: +// - "warehouse": The warehouse path (required) +// - "metastore": Catalog type - "filesystem" (default) or "rest" +// - "uri": REST catalog server URI (required for REST catalog) +// - "s3.access-key-id", "s3.secret-access-key", "s3.region": S3 credentials +// - "fs.oss.accessKeyId", "fs.oss.accessKeySecret", "fs.oss.endpoint": OSS credentials +func NewCatalog(options map[string]string) (*Catalog, error) { ctx, lib, err := ensureLoaded() if err != nil { return nil, err } - createFn := ffiCatalogNew.symbol(ctx) - inner, err := createFn(warehouse) + createFn := ffiCatalogCreate.symbol(ctx) + inner, err := createFn(options) if err != nil { return nil, err } @@ -80,20 +88,41 @@ func (c *Catalog) GetTable(id Identifier) (*Table, error) { return &Table{ctx: c.ctx, lib: c.lib, inner: inner}, nil } -var ffiCatalogNew = newFFI(ffiOpts{ - sym: "paimon_catalog_new", +var ffiCatalogCreate = newFFI(ffiOpts{ + sym: "paimon_catalog_create", rType: &typeResultCatalogNew, - aTypes: []*ffi.Type{&ffi.TypePointer}, -}, func(ctx context.Context, ffiCall ffiCall) func(warehouse string) (*paimonCatalog, error) { - return func(warehouse string) (*paimonCatalog, error) { - byteWarehouse, err := bytePtrFromString(warehouse) - if err != nil { - return nil, err + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(options map[string]string) (*paimonCatalog, error) { + return func(options map[string]string) (*paimonCatalog, error) { + // Convert map to array of paimonOption + type paimonOption struct { + key *byte + value *byte } + opts := make([]paimonOption, 0, len(options)) + for k, v := range options { + keyBytes, err := bytePtrFromString(k) + if err != nil { + return nil, err + } + valBytes, err := bytePtrFromString(v) + if err != nil { + return nil, err + } + opts = append(opts, paimonOption{key: keyBytes, value: valBytes}) + } + + var optsPtr unsafe.Pointer + if len(opts) > 0 { + optsPtr = unsafe.Pointer(&opts[0]) + } + optsLen := uintptr(len(opts)) + var result resultCatalogNew ffiCall( unsafe.Pointer(&result), - unsafe.Pointer(&byteWarehouse), + unsafe.Pointer(&optsPtr), + unsafe.Pointer(&optsLen), ) if result.error != nil { return nil, parseError(ctx, result.error) diff --git a/bindings/go/paimon.go b/bindings/go/paimon.go index ac9ac272..bfffba02 100644 --- a/bindings/go/paimon.go +++ b/bindings/go/paimon.go @@ -28,13 +28,34 @@ // // Basic usage: // -// catalog, err := paimon.NewFileSystemCatalog("/path/to/warehouse") +// // Create a catalog with options +// catalog, err := paimon.NewCatalog(map[string]string{ +// "warehouse": "/path/to/warehouse", +// }) // if err != nil { log.Fatal(err) } // defer catalog.Close() // // table, err := catalog.GetTable(paimon.NewIdentifier("default", "my_table")) // if err != nil { log.Fatal(err) } // defer table.Close() +// +// For S3 or OSS warehouses, pass the appropriate credentials: +// +// // S3 +// catalog, _ := paimon.NewCatalog(map[string]string{ +// "warehouse": "s3://bucket/warehouse", +// "s3.access-key-id": "...", +// "s3.secret-access-key": "...", +// "s3.region": "us-east-1", +// }) +// +// // OSS +// catalog, _ := paimon.NewCatalog(map[string]string{ +// "warehouse": "oss://bucket/warehouse", +// "fs.oss.accessKeyId": "...", +// "fs.oss.accessKeySecret": "...", +// "fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com", +// }) package paimon import ( diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go index ef38c2be..e83f1511 100644 --- a/bindings/go/tests/paimon_test.go +++ b/bindings/go/tests/paimon_test.go @@ -45,7 +45,10 @@ func TestReadLogTable(t *testing.T) { t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse) } - catalog, err := paimon.NewFileSystemCatalog(warehouse) + // Use NewCatalog with options + catalog, err := paimon.NewCatalog(map[string]string{ + "warehouse": warehouse, + }) if err != nil { t.Fatalf("Failed to create catalog: %v", err) } @@ -167,7 +170,10 @@ func TestReadWithProjection(t *testing.T) { t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse) } - catalog, err := paimon.NewFileSystemCatalog(warehouse) + // Use NewCatalog with options + catalog, err := paimon.NewCatalog(map[string]string{ + "warehouse": warehouse, + }) if err != nil { t.Fatalf("Failed to create catalog: %v", err) } diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index ec96653f..9d7e8918 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -23,7 +23,7 @@ use paimon::api::ConfigResponse; use paimon::catalog::{Identifier, RESTCatalog}; use paimon::common::Options; use paimon::spec::{DataType, IntType, Predicate, Schema, VarCharType}; -use paimon::{Catalog, Error, FileSystemCatalog, Plan}; +use paimon::{Catalog, CatalogOptions, Error, FileSystemCatalog, Plan}; use std::collections::{HashMap, HashSet}; #[path = "../../paimon/tests/mock_server.rs"] @@ -77,7 +77,9 @@ async fn get_table_from_catalog( fn create_file_system_catalog() -> FileSystemCatalog { let warehouse = get_test_warehouse(); - FileSystemCatalog::new(warehouse).expect("Failed to create FileSystemCatalog") + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + FileSystemCatalog::new(options).expect("Failed to create FileSystemCatalog") } async fn scan_and_read_with_fs_catalog( @@ -801,7 +803,7 @@ async fn setup_rest_catalog_with_tables( // Register each table with its schema and the real on-disk path for (database, table_name, schema) in table_configs { - let table_path = format!("{}/{}.db/{}", catalog_path, database, table_name); + let table_path = format!("{catalog_path}/{database}.db/{table_name}"); server.add_table_with_schema(database, table_name, schema.clone(), &table_path); } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index dd1cad76..2ac12dc7 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -20,16 +20,22 @@ use std::sync::Arc; use datafusion::arrow::array::{Int32Array, StringArray}; use datafusion::prelude::SessionContext; use paimon::catalog::Identifier; -use paimon::{Catalog, FileSystemCatalog}; +use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; use paimon_datafusion::PaimonTableProvider; fn get_test_warehouse() -> String { std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) } -async fn create_context(table_name: &str) -> SessionContext { +fn create_catalog() -> FileSystemCatalog { let warehouse = get_test_warehouse(); - let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + FileSystemCatalog::new(options).expect("Failed to create catalog") +} + +async fn create_context(table_name: &str) -> SessionContext { + let catalog = create_catalog(); let identifier = Identifier::new("default", table_name); let table = catalog .get_table(&identifier) @@ -165,8 +171,7 @@ async fn test_scan_partition_count_respects_session_config() { use datafusion::datasource::TableProvider; use datafusion::prelude::SessionConfig; - let warehouse = get_test_warehouse(); - let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); + let catalog = create_catalog(); let identifier = Identifier::new("default", "partitioned_log_table"); let table = catalog .get_table(&identifier) diff --git a/crates/paimon/examples/rest_catalog_example.rs b/crates/paimon/examples/rest_catalog_example.rs index 1e0d6fa5..e03f2692 100644 --- a/crates/paimon/examples/rest_catalog_example.rs +++ b/crates/paimon/examples/rest_catalog_example.rs @@ -17,7 +17,7 @@ //! Example: REST Catalog Operations (Complete) //! -//! This example demonstrates how to use `RESTCatalog` for: +//! This example demonstrates how to use `CatalogFactory` to create a REST catalog and: //! 1. Database operations (create, list, get, drop) //! 2. Table operations (create, list, get, rename, drop) //! 3. Data reading from append-only tables @@ -37,9 +37,10 @@ use std::collections::HashMap; use futures::TryStreamExt; -use paimon::catalog::{Catalog, Identifier, RESTCatalog}; +use paimon::catalog::Identifier; use paimon::common::{CatalogOptions, Options}; use paimon::spec::{DataType, IntType, Schema, VarCharType}; +use paimon::CatalogFactory; /// Create a simple test schema with `id` (INT) and `name` (VARCHAR) columns. fn create_test_schema() -> Schema { @@ -101,10 +102,10 @@ async fn main() { // ==================== Create RESTCatalog ==================== println!("Creating RESTCatalog instance..."); - let catalog = match RESTCatalog::new(options, true).await { + let catalog = match CatalogFactory::create(options).await { Ok(catalog) => catalog, Err(err) => { - eprintln!("Failed to create RESTCatalog: {}", err); + eprintln!("Failed to create catalog: {err}"); return; } }; @@ -116,11 +117,11 @@ async fn main() { println!("Listing databases..."); match catalog.list_databases().await { Ok(databases) => { - println!("Databases found: {:?}", databases); + println!("Databases found: {databases:?}"); println!("Total count: {}", databases.len()); } Err(err) => { - eprintln!("Failed to list databases: {}", err); + eprintln!("Failed to list databases: {err}"); } } @@ -131,14 +132,14 @@ async fn main() { .await { Ok(()) => println!("Database created successfully"), - Err(err) => eprintln!("Failed to create database: {}", err), + Err(err) => eprintln!("Failed to create database: {err}"), } // Get database info println!("\nGetting database info for 'example_db'..."); match catalog.get_database("example_db").await { - Ok(database) => println!("Database: {:?}", database), - Err(err) => eprintln!("Failed to get database: {}", err), + Ok(database) => println!("Database: {database:?}"), + Err(err) => eprintln!("Failed to get database: {err}"), } // ==================== Part 2: Table Operations ==================== @@ -146,46 +147,43 @@ async fn main() { // Create table let table_identifier = Identifier::new("example_db", "users"); - println!("Creating table '{}'...", table_identifier); + println!("Creating table '{table_identifier}'..."); let schema = create_test_schema(); match catalog.create_table(&table_identifier, schema, false).await { Ok(()) => println!("Table created successfully"), - Err(err) => eprintln!("Failed to create table: {}", err), + Err(err) => eprintln!("Failed to create table: {err}"), } // List tables println!("\nListing tables in 'example_db'..."); match catalog.list_tables("example_db").await { Ok(tables) => { - println!("Tables found: {:?}", tables); + println!("Tables found: {tables:?}"); } Err(err) => { - eprintln!("Failed to list tables: {}", err); + eprintln!("Failed to list tables: {err}"); } } // Get table info - println!("\nGetting table info for '{}'...", table_identifier); + println!("\nGetting table info for '{table_identifier}'..."); match catalog.get_table(&table_identifier).await { Ok(table) => { println!("Table location: {}", table.location()); println!("Table schema fields: {:?}", table.schema().fields()); } - Err(err) => eprintln!("Failed to get table: {}", err), + Err(err) => eprintln!("Failed to get table: {err}"), } // Rename table let renamed_identifier = Identifier::new("example_db", "users_renamed"); - println!( - "\nRenaming table '{}' to '{}'...", - table_identifier, renamed_identifier - ); + println!("\nRenaming table '{table_identifier}' to '{renamed_identifier}'..."); match catalog .rename_table(&table_identifier, &renamed_identifier, false) .await { Ok(()) => println!("Table renamed successfully"), - Err(err) => eprintln!("Failed to rename table: {}", err), + Err(err) => eprintln!("Failed to rename table: {err}"), } // ==================== Part 3: Read Data from Existing Table ==================== @@ -194,10 +192,7 @@ async fn main() { // Try to read from an existing table (example_db.users_renamed) // This table must already exist on the REST catalog server let read_table_identifier = Identifier::new("example_db", "users_renamed"); - println!( - "Attempting to read from table '{}'...", - read_table_identifier - ); + println!("Attempting to read from table '{read_table_identifier}'..."); match catalog.get_table(&read_table_identifier).await { Ok(table) => { @@ -257,28 +252,27 @@ async fn main() { } println!("\n=== Read Summary ==="); - println!("Total rows read: {}", total_rows); + println!("Total rows read: {total_rows}"); println!("Total batches: {}", batches.len()); } Err(err) => { - eprintln!("Failed to create arrow stream: {}", err); + eprintln!("Failed to create arrow stream: {err}"); } }, Err(err) => { - eprintln!("Failed to create table read: {}", err); + eprintln!("Failed to create table read: {err}"); } } } } Err(err) => { - eprintln!("Failed to plan scan: {}", err); + eprintln!("Failed to plan scan: {err}"); } } } Err(err) => { eprintln!( - "Failed to get table '{}' (this is expected if the table doesn't exist): {}", - read_table_identifier, err + "Failed to get table '{read_table_identifier}' (this is expected if the table doesn't exist): {err}" ); } } @@ -286,16 +280,16 @@ async fn main() { // ==================== Cleanup ==================== println!("\n=== Cleanup ===\n"); // Drop table - println!("\nDropping table '{}'...", renamed_identifier); + println!("\nDropping table '{renamed_identifier}'..."); match catalog.drop_table(&renamed_identifier, false).await { Ok(()) => println!("Table dropped successfully"), - Err(err) => eprintln!("Failed to drop table: {}", err), + Err(err) => eprintln!("Failed to drop table: {err}"), } // Drop database (cascade = true to force drop even if not empty) println!("Dropping database 'example_db'..."); match catalog.drop_database("example_db", false, true).await { Ok(()) => println!("Database dropped successfully"), - Err(err) => eprintln!("Failed to drop database: {}", err), + Err(err) => eprintln!("Failed to drop database: {err}"), } println!("\nExample completed!"); diff --git a/crates/paimon/src/catalog/factory.rs b/crates/paimon/src/catalog/factory.rs new file mode 100644 index 00000000..7e8993c5 --- /dev/null +++ b/crates/paimon/src/catalog/factory.rs @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Catalog factory for creating catalogs based on configuration options. +//! +//! This module provides a factory pattern for creating different catalog types +//! (filesystem, REST, etc.) based on the `metastore` option. +//! +//! # Example +//! +//! ```ignore +//! use std::collections::HashMap; +//! use paimon::{CatalogFactory, Options}; +//! +//! // Create a filesystem catalog +//! let mut options = Options::new(); +//! options.set("warehouse", "/path/to/warehouse"); +//! let catalog = CatalogFactory::create(options).await?; +//! +//! // Create a REST catalog +//! let mut options = Options::new(); +//! options.set("metastore", "rest"); +//! options.set("uri", "http://localhost:8080"); +//! options.set("warehouse", "my_warehouse"); +//! let catalog = CatalogFactory::create(options).await?; +//! ``` + +use std::sync::Arc; + +use crate::catalog::{Catalog, FileSystemCatalog, RESTCatalog}; +use crate::common::{CatalogOptions, Options}; +use crate::error::{ConfigInvalidSnafu, Result}; + +/// Supported catalog types. +const METASTORE_FILESYSTEM: &str = "filesystem"; +const METASTORE_REST: &str = "rest"; + +/// Factory for creating Paimon catalogs. +/// +/// The factory determines the catalog type based on the `metastore` option: +/// - `"filesystem"` (default): Creates a [`FileSystemCatalog`] +/// - `"rest"`: Creates a [`RESTCatalog`] +/// +/// # Example +/// +/// ```ignore +/// use paimon::{CatalogFactory, Options}; +/// +/// let mut options = Options::new(); +/// options.set("warehouse", "/path/to/warehouse"); +/// let catalog = CatalogFactory::create(options).await?; +/// ``` +pub struct CatalogFactory; + +impl CatalogFactory { + /// Create a catalog based on the provided options. + /// + /// The catalog type is determined by the `metastore` option: + /// - `"filesystem"` (default): Creates a filesystem-based catalog + /// - `"rest"`: Creates a REST-based catalog + /// + /// # Arguments + /// * `options` - Configuration options containing warehouse path, URI, etc. + /// + /// # Returns + /// An `Arc` that can be used for database and table operations. + /// + /// # Errors + /// - Returns an error if required options are missing + /// - Returns an error if the metastore type is unknown + pub async fn create(options: Options) -> Result> { + let metastore = options + .get(CatalogOptions::METASTORE) + .map(|s| s.as_str()) + .unwrap_or(METASTORE_FILESYSTEM); + + match metastore { + METASTORE_FILESYSTEM => Self::create_filesystem_catalog(options), + METASTORE_REST => Self::create_rest_catalog(options).await, + _ => ConfigInvalidSnafu { + message: format!( + "Unknown metastore type: '{metastore}'. Available types: {METASTORE_FILESYSTEM}, {METASTORE_REST}" + ), + } + .fail(), + } + } + + /// Create a filesystem catalog. + fn create_filesystem_catalog(options: Options) -> Result> { + let catalog = FileSystemCatalog::new(options)?; + Ok(Arc::new(catalog)) + } + + /// Create a REST catalog. + async fn create_rest_catalog(options: Options) -> Result> { + let catalog = RESTCatalog::new(options, true).await?; + Ok(Arc::new(catalog)) + } +} + +#[cfg(test)] +#[cfg(not(windows))] // Skip on Windows due to path compatibility issues +mod tests { + use super::*; + + #[tokio::test] + async fn test_create_filesystem_catalog() { + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, "/tmp/test-warehouse"); + + let result = CatalogFactory::create(options).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_create_filesystem_catalog_explicit() { + let mut options = Options::new(); + options.set(CatalogOptions::METASTORE, "filesystem"); + options.set(CatalogOptions::WAREHOUSE, "/tmp/test-warehouse"); + + let result = CatalogFactory::create(options).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_missing_warehouse_option() { + let options = Options::new(); + let result = CatalogFactory::create(options).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_unknown_metastore_type() { + let mut options = Options::new(); + options.set(CatalogOptions::METASTORE, "unknown"); + options.set(CatalogOptions::WAREHOUSE, "/tmp/test"); + + let result = CatalogFactory::create(options).await; + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index 61d6e8d3..bf0ffec1 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -22,13 +22,15 @@ use std::collections::HashMap; use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP, DB_SUFFIX}; -use crate::error::{Error, Result}; +use crate::common::{CatalogOptions, Options}; +use crate::error::{ConfigInvalidSnafu, Error, Result}; use crate::io::FileIO; use crate::spec::{Schema, TableSchema}; use crate::table::Table; use async_trait::async_trait; use bytes::Bytes; use opendal::raw::get_basename; +use snafu::OptionExt; /// Name of the schema directory under each table path. const SCHEMA_DIR: &str = "schema"; @@ -67,16 +69,44 @@ pub struct FileSystemCatalog { #[allow(dead_code)] impl FileSystemCatalog { - /// Create a new filesystem catalog. + /// Create a new filesystem catalog from configuration options. /// /// # Arguments - /// * `warehouse` - The root warehouse path - pub fn new(warehouse: impl Into) -> crate::Result { - let warehouse = warehouse.into(); - Ok(Self { - file_io: FileIO::from_path(warehouse.as_str())?.build()?, - warehouse, - }) + /// * `options` - Configuration options containing warehouse path and storage configs (S3, OSS, etc.) + /// + /// # Required Options + /// * `warehouse` - The root warehouse path (e.g., `/path/to/warehouse`, `s3://bucket/warehouse`) + /// + /// # Example + /// ```ignore + /// use paimon::{FileSystemCatalog, Options, CatalogOptions}; + /// + /// // Local filesystem + /// let mut options = Options::new(); + /// options.set(CatalogOptions::WAREHOUSE, "/tmp/warehouse"); + /// let catalog = FileSystemCatalog::new(options)?; + /// + /// // S3 with credentials + /// let mut options = Options::new(); + /// options.set(CatalogOptions::WAREHOUSE, "s3://bucket/warehouse"); + /// options.set("s3.access-key-id", "..."); + /// options.set("s3.secret-access-key", "..."); + /// let catalog = FileSystemCatalog::new(options)?; + /// ``` + pub fn new(options: Options) -> crate::Result { + let warehouse = + options + .get(CatalogOptions::WAREHOUSE) + .cloned() + .context(ConfigInvalidSnafu { + message: format!("Missing required option: {}", CatalogOptions::WAREHOUSE), + })?; + + let file_io = FileIO::from_path(&warehouse)? + .with_props(options.to_map().iter()) + .build()?; + + Ok(Self { file_io, warehouse }) } /// Get the warehouse path. @@ -424,7 +454,9 @@ mod tests { fn create_test_catalog() -> (TempDir, FileSystemCatalog) { let temp_dir = TempDir::new().unwrap(); let warehouse = temp_dir.path().to_str().unwrap().to_string(); - let catalog = FileSystemCatalog::new(warehouse).unwrap(); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).unwrap(); (temp_dir, catalog) } diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index 1b4bf574..0be4ec48 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -21,6 +21,7 @@ //! and follows API patterns from Apache Iceberg Rust. mod database; +mod factory; mod filesystem; mod rest; @@ -28,6 +29,7 @@ use std::collections::HashMap; use std::fmt; pub use database::*; +pub use factory::*; pub use filesystem::*; pub use rest::*; use serde::{Deserialize, Serialize}; diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 8279aac9..64a74a38 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -34,6 +34,7 @@ pub mod spec; pub mod table; pub use catalog::Catalog; +pub use catalog::CatalogFactory; pub use catalog::FileSystemCatalog; pub use table::{ diff --git a/crates/paimon/tests/mock_server.rs b/crates/paimon/tests/mock_server.rs index e5e97505..4ff74480 100644 --- a/crates/paimon/tests/mock_server.rs +++ b/crates/paimon/tests/mock_server.rs @@ -593,7 +593,7 @@ impl RESTServer { ) }); - let key = format!("{}.{}", database, table); + let key = format!("{database}.{table}"); s.tables.insert( key, GetTableResponse::new( diff --git a/crates/paimon/tests/rest_catalog_test.rs b/crates/paimon/tests/rest_catalog_test.rs index 7ac8536b..958d50bd 100644 --- a/crates/paimon/tests/rest_catalog_test.rs +++ b/crates/paimon/tests/rest_catalog_test.rs @@ -97,7 +97,7 @@ async fn test_catalog_create_database() { .catalog .create_database("new_db", false, HashMap::new()) .await; - assert!(result.is_ok(), "failed to create database: {:?}", result); + assert!(result.is_ok(), "failed to create database: {result:?}"); // Verify creation let dbs = ctx.catalog.list_databases().await.unwrap(); @@ -144,7 +144,7 @@ async fn test_catalog_drop_database() { // Drop database (cascade=true to skip empty check) let result = ctx.catalog.drop_database("to_drop", false, true).await; - assert!(result.is_ok(), "failed to drop database: {:?}", result); + assert!(result.is_ok(), "failed to drop database: {result:?}"); // Verify database is gone let dbs = ctx.catalog.list_databases().await.unwrap(); @@ -232,8 +232,7 @@ async fn test_catalog_list_tables_empty() { let tables = ctx.catalog.list_tables("default").await.unwrap(); assert!( tables.is_empty(), - "expected empty tables list, got: {:?}", - tables + "expected empty tables list, got: {tables:?}" ); } @@ -252,7 +251,7 @@ async fn test_catalog_get_table() { let identifier = Identifier::new("default", "my_table"); let table = ctx.catalog.get_table(&identifier).await; - assert!(table.is_ok(), "failed to get table: {:?}", table); + assert!(table.is_ok(), "failed to get table: {table:?}"); } #[tokio::test] @@ -272,7 +271,7 @@ async fn test_catalog_create_table() { let identifier = Identifier::new("default", "new_table"); let result = ctx.catalog.create_table(&identifier, schema, false).await; - assert!(result.is_ok(), "failed to create table: {:?}", result); + assert!(result.is_ok(), "failed to create table: {result:?}"); // Verify table exists let tables = ctx.catalog.list_tables("default").await.unwrap(); @@ -326,7 +325,7 @@ async fn test_catalog_drop_table() { // Drop table let result = ctx.catalog.drop_table(&identifier, false).await; - assert!(result.is_ok(), "failed to drop table: {:?}", result); + assert!(result.is_ok(), "failed to drop table: {result:?}"); // Verify table is gone let tables = ctx.catalog.list_tables("default").await.unwrap(); @@ -375,7 +374,7 @@ async fn test_catalog_rename_table() { // Rename table let result = ctx.catalog.rename_table(&from, &to, false).await; - assert!(result.is_ok(), "failed to rename table: {:?}", result); + assert!(result.is_ok(), "failed to rename table: {result:?}"); // Verify old table is gone and new table exists let tables = ctx.catalog.list_tables("default").await.unwrap(); diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md index 6e42ae3a..dac6af9b 100644 --- a/docs/src/getting-started.md +++ b/docs/src/getting-started.md @@ -48,16 +48,51 @@ Available storage features: ## Catalog Management -`FileSystemCatalog` manages databases and tables stored on a local (or remote) filesystem. +Paimon supports multiple catalog types. The `CatalogFactory` provides a unified way to create catalogs based on configuration options. ### Create a Catalog -```rust -use paimon::FileSystemCatalog; +The `CatalogFactory` automatically determines the catalog type based on the `metastore` option: -let catalog = FileSystemCatalog::new("/tmp/paimon-warehouse")?; +```rust +use paimon::{CatalogFactory, CatalogOptions, Options}; + +// Local filesystem (no credentials needed) +let mut options = Options::new(); +options.set(CatalogOptions::WAREHOUSE, "/path/to/warehouse"); +let catalog = CatalogFactory::create(options).await?; + +// Amazon S3 +let mut options = Options::new(); +options.set(CatalogOptions::WAREHOUSE, "s3://bucket/warehouse"); +options.set("s3.access-key-id", "your-access-key-id"); +options.set("s3.secret-access-key", "your-secret-access-key"); +options.set("s3.region", "us-east-1"); +let catalog = CatalogFactory::create(options).await?; + +// Alibaba Cloud OSS +let mut options = Options::new(); +options.set(CatalogOptions::WAREHOUSE, "oss://bucket/warehouse"); +options.set("fs.oss.accessKeyId", "your-access-key-id"); +options.set("fs.oss.accessKeySecret", "your-access-key-secret"); +options.set("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com"); +let catalog = CatalogFactory::create(options).await?; + +// REST catalog +let mut options = Options::new(); +options.set(CatalogOptions::METASTORE, "rest"); +options.set(CatalogOptions::URI, "http://localhost:8080"); +options.set(CatalogOptions::WAREHOUSE, "my_warehouse"); +let catalog = CatalogFactory::create(options).await?; ``` +Supported metastore types: + +| Metastore Type | Description | +|----------------|----------------------------------| +| `filesystem` | Local or remote filesystem (default) | +| `rest` | REST catalog server | + ### Manage Databases ```rust