Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 42 additions & 18 deletions bindings/c/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,56 @@
// specific language governing permissions and limitations
// under the License.

use std::ffi::{c_char, c_void};
use std::ffi::c_void;
use std::sync::Arc;

use paimon::catalog::Identifier;
use paimon::{Catalog, FileSystemCatalog};
use paimon::{Catalog, CatalogFactory, Options};

use crate::error::{check_non_null, paimon_error, validate_cstr};
use crate::result::{paimon_result_catalog_new, paimon_result_get_table};
use crate::runtime;
use crate::types::{paimon_catalog, paimon_table};
use crate::types::{paimon_catalog, paimon_option, paimon_table};

/// Create a new FileSystemCatalog.
/// Create a catalog using CatalogFactory with the given options.
///
/// # Safety
/// `warehouse` must be a valid null-terminated C string, or null (returns error).
/// `options` must be a valid pointer to an array of `paimon_option` with `options_len` elements.
/// Each key and value in the options must be valid null-terminated C strings.
#[no_mangle]
pub unsafe extern "C" fn paimon_catalog_new(warehouse: *const c_char) -> paimon_result_catalog_new {
let warehouse_str = match validate_cstr(warehouse, "warehouse") {
Ok(s) => s,
Err(e) => {
return paimon_result_catalog_new {
catalog: std::ptr::null_mut(),
error: e,
}
pub unsafe extern "C" fn paimon_catalog_create(
options: *const paimon_option,
options_len: usize,
) -> paimon_result_catalog_new {
// Build Options from the array
let mut opts = Options::new();
if !options.is_null() && options_len > 0 {
let options_slice = std::slice::from_raw_parts(options, options_len);
for opt in options_slice {
let key = match validate_cstr(opt.key, "option key") {
Ok(s) => s,
Err(e) => {
return paimon_result_catalog_new {
catalog: std::ptr::null_mut(),
error: e,
}
}
};
let value = match validate_cstr(opt.value, "option value") {
Ok(s) => s,
Err(e) => {
return paimon_result_catalog_new {
catalog: std::ptr::null_mut(),
error: e,
}
}
};
opts.set(key, value);
}
};
match FileSystemCatalog::new(warehouse_str) {
}

// Create catalog using CatalogFactory
match runtime().block_on(CatalogFactory::create(opts)) {
Ok(catalog) => {
let wrapper = Box::new(paimon_catalog {
inner: Box::into_raw(Box::new(catalog)) as *mut c_void,
Expand All @@ -60,13 +84,13 @@ pub unsafe extern "C" fn paimon_catalog_new(warehouse: *const c_char) -> paimon_
/// Free a paimon_catalog.
///
/// # Safety
/// Only call with a catalog returned from `paimon_catalog_new`.
/// Only call with a catalog returned from `paimon_catalog_create`.
#[no_mangle]
pub unsafe extern "C" fn paimon_catalog_free(catalog: *mut paimon_catalog) {
if !catalog.is_null() {
let c = Box::from_raw(catalog);
if !c.inner.is_null() {
drop(Box::from_raw(c.inner as *mut FileSystemCatalog));
drop(Box::from_raw(c.inner as *mut Arc<dyn Catalog>));
}
}
}
Expand All @@ -93,7 +117,7 @@ pub unsafe extern "C" fn paimon_catalog_get_table(
};
}

let catalog_ref = &*((*catalog).inner as *const FileSystemCatalog);
let catalog_ref = &*((*catalog).inner as *const Arc<dyn Catalog>);
let identifier_ref = &*((*identifier).inner as *const Identifier);

match runtime().block_on(catalog_ref.get_table(identifier_ref)) {
Expand Down
7 changes: 7 additions & 0 deletions bindings/c/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ use std::ffi::c_void;
use paimon::spec::DataField;
use paimon::table::Table;

/// C-compatible key-value pair for options.
#[repr(C)]
pub struct paimon_option {
pub key: *const std::ffi::c_char,
pub value: *const std::ffi::c_char,
}

/// C-compatible byte buffer.
#[repr(C)]
#[derive(Clone, Copy)]
Expand Down
57 changes: 43 additions & 14 deletions bindings/go/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,30 @@ import (
"github.com/jupiterrider/ffi"
)

// Catalog wraps a paimon FileSystemCatalog.
// Catalog wraps a paimon Catalog.
type Catalog struct {
ctx context.Context
lib *libRef
inner *paimonCatalog
closeOnce sync.Once
}

// NewFileSystemCatalog creates a new FileSystemCatalog for the given warehouse path.
func NewFileSystemCatalog(warehouse string) (*Catalog, error) {
// NewCatalog creates a new Catalog using the CatalogFactory with the given options.
// The catalog type is determined by the "metastore" option (default: "filesystem").
//
// Common options:
// - "warehouse": The warehouse path (required)
// - "metastore": Catalog type - "filesystem" (default) or "rest"
// - "uri": REST catalog server URI (required for REST catalog)
// - "s3.access-key-id", "s3.secret-access-key", "s3.region": S3 credentials
// - "fs.oss.accessKeyId", "fs.oss.accessKeySecret", "fs.oss.endpoint": OSS credentials
func NewCatalog(options map[string]string) (*Catalog, error) {
ctx, lib, err := ensureLoaded()
if err != nil {
return nil, err
}
createFn := ffiCatalogNew.symbol(ctx)
inner, err := createFn(warehouse)
createFn := ffiCatalogCreate.symbol(ctx)
inner, err := createFn(options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -80,20 +88,41 @@ func (c *Catalog) GetTable(id Identifier) (*Table, error) {
return &Table{ctx: c.ctx, lib: c.lib, inner: inner}, nil
}

var ffiCatalogNew = newFFI(ffiOpts{
sym: "paimon_catalog_new",
var ffiCatalogCreate = newFFI(ffiOpts{
sym: "paimon_catalog_create",
rType: &typeResultCatalogNew,
aTypes: []*ffi.Type{&ffi.TypePointer},
}, func(ctx context.Context, ffiCall ffiCall) func(warehouse string) (*paimonCatalog, error) {
return func(warehouse string) (*paimonCatalog, error) {
byteWarehouse, err := bytePtrFromString(warehouse)
if err != nil {
return nil, err
aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
}, func(ctx context.Context, ffiCall ffiCall) func(options map[string]string) (*paimonCatalog, error) {
return func(options map[string]string) (*paimonCatalog, error) {
// Convert map to array of paimonOption
type paimonOption struct {
key *byte
value *byte
}
opts := make([]paimonOption, 0, len(options))
for k, v := range options {
keyBytes, err := bytePtrFromString(k)
if err != nil {
return nil, err
}
valBytes, err := bytePtrFromString(v)
if err != nil {
return nil, err
}
opts = append(opts, paimonOption{key: keyBytes, value: valBytes})
}

var optsPtr unsafe.Pointer
if len(opts) > 0 {
optsPtr = unsafe.Pointer(&opts[0])
}
optsLen := uintptr(len(opts))

var result resultCatalogNew
ffiCall(
unsafe.Pointer(&result),
unsafe.Pointer(&byteWarehouse),
unsafe.Pointer(&optsPtr),
unsafe.Pointer(&optsLen),
)
if result.error != nil {
return nil, parseError(ctx, result.error)
Expand Down
23 changes: 22 additions & 1 deletion bindings/go/paimon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,34 @@
//
// Basic usage:
//
// catalog, err := paimon.NewFileSystemCatalog("/path/to/warehouse")
// // Create a catalog with options
// catalog, err := paimon.NewCatalog(map[string]string{
// "warehouse": "/path/to/warehouse",
// })
// if err != nil { log.Fatal(err) }
// defer catalog.Close()
//
// table, err := catalog.GetTable(paimon.NewIdentifier("default", "my_table"))
// if err != nil { log.Fatal(err) }
// defer table.Close()
//
// For S3 or OSS warehouses, pass the appropriate credentials:
//
// // S3
// catalog, _ := paimon.NewCatalog(map[string]string{
// "warehouse": "s3://bucket/warehouse",
// "s3.access-key-id": "...",
// "s3.secret-access-key": "...",
// "s3.region": "us-east-1",
// })
//
// // OSS
// catalog, _ := paimon.NewCatalog(map[string]string{
// "warehouse": "oss://bucket/warehouse",
// "fs.oss.accessKeyId": "...",
// "fs.oss.accessKeySecret": "...",
// "fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com",
// })
package paimon

import (
Expand Down
10 changes: 8 additions & 2 deletions bindings/go/tests/paimon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func TestReadLogTable(t *testing.T) {
t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse)
}

catalog, err := paimon.NewFileSystemCatalog(warehouse)
// Use NewCatalog with options
catalog, err := paimon.NewCatalog(map[string]string{
"warehouse": warehouse,
})
if err != nil {
t.Fatalf("Failed to create catalog: %v", err)
}
Expand Down Expand Up @@ -167,7 +170,10 @@ func TestReadWithProjection(t *testing.T) {
t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse)
}

catalog, err := paimon.NewFileSystemCatalog(warehouse)
// Use NewCatalog with options
catalog, err := paimon.NewCatalog(map[string]string{
"warehouse": warehouse,
})
if err != nil {
t.Fatalf("Failed to create catalog: %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use paimon::api::ConfigResponse;
use paimon::catalog::{Identifier, RESTCatalog};
use paimon::common::Options;
use paimon::spec::{DataType, IntType, Predicate, Schema, VarCharType};
use paimon::{Catalog, Error, FileSystemCatalog, Plan};
use paimon::{Catalog, CatalogOptions, Error, FileSystemCatalog, Plan};
use std::collections::{HashMap, HashSet};

#[path = "../../paimon/tests/mock_server.rs"]
Expand Down Expand Up @@ -77,7 +77,9 @@ async fn get_table_from_catalog<C: Catalog + ?Sized>(

fn create_file_system_catalog() -> FileSystemCatalog {
let warehouse = get_test_warehouse();
FileSystemCatalog::new(warehouse).expect("Failed to create FileSystemCatalog")
let mut options = Options::new();
options.set(CatalogOptions::WAREHOUSE, warehouse);
FileSystemCatalog::new(options).expect("Failed to create FileSystemCatalog")
}

async fn scan_and_read_with_fs_catalog(
Expand Down Expand Up @@ -801,7 +803,7 @@ async fn setup_rest_catalog_with_tables(

// Register each table with its schema and the real on-disk path
for (database, table_name, schema) in table_configs {
let table_path = format!("{}/{}.db/{}", catalog_path, database, table_name);
let table_path = format!("{catalog_path}/{database}.db/{table_name}");
server.add_table_with_schema(database, table_name, schema.clone(), &table_path);
}

Expand Down
15 changes: 10 additions & 5 deletions crates/integrations/datafusion/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@ use std::sync::Arc;
use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::prelude::SessionContext;
use paimon::catalog::Identifier;
use paimon::{Catalog, FileSystemCatalog};
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
use paimon_datafusion::PaimonTableProvider;

fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string())
}

async fn create_context(table_name: &str) -> SessionContext {
fn create_catalog() -> FileSystemCatalog {
let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");
let mut options = Options::new();
options.set(CatalogOptions::WAREHOUSE, warehouse);
FileSystemCatalog::new(options).expect("Failed to create catalog")
}

async fn create_context(table_name: &str) -> SessionContext {
let catalog = create_catalog();
let identifier = Identifier::new("default", table_name);
let table = catalog
.get_table(&identifier)
Expand Down Expand Up @@ -165,8 +171,7 @@ async fn test_scan_partition_count_respects_session_config() {
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionConfig;

let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");
let catalog = create_catalog();
let identifier = Identifier::new("default", "partitioned_log_table");
let table = catalog
.get_table(&identifier)
Expand Down
Loading
Loading