From fbe5c0cd5d0c6014a5629b0063b1543a6cae2a71 Mon Sep 17 00:00:00 2001 From: khustup2 Date: Wed, 11 Feb 2026 23:38:52 +0000 Subject: [PATCH 1/3] Make db creation stateless. --- cpp/CMakeLists.pg.cmake | 3 +- cpp/deeplake_pg/dl_catalog.cpp | 100 +++++++++- cpp/deeplake_pg/dl_catalog.hpp | 15 ++ cpp/deeplake_pg/extension_init.cpp | 77 ++++++++ cpp/deeplake_pg/sync_worker.cpp | 297 ++++++++++++++++++++++++++++- cpp/deeplake_pg/sync_worker.hpp | 41 ++++ 6 files changed, 525 insertions(+), 8 deletions(-) diff --git a/cpp/CMakeLists.pg.cmake b/cpp/CMakeLists.pg.cmake index fa53cf215f..a0dcc19c13 100644 --- a/cpp/CMakeLists.pg.cmake +++ b/cpp/CMakeLists.pg.cmake @@ -70,6 +70,7 @@ foreach(PG_VERSION ${PG_VERSIONS}) endif() set(PG_SERVER_INCLUDE_DIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/include/server") + set(PG_INCLUDE_DIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/include") set(PG_PKGLIBDIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/lib") set(PG_SHAREDIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/share") @@ -79,7 +80,7 @@ foreach(PG_VERSION ${PG_VERSIONS}) ) target_include_directories(${PG_LIB} - SYSTEM PRIVATE ${PG_SERVER_INCLUDE_DIR} + SYSTEM PRIVATE ${PG_SERVER_INCLUDE_DIR} ${PG_INCLUDE_DIR} PRIVATE ${indicators_INCLUDE_DIRS} ) diff --git a/cpp/deeplake_pg/dl_catalog.cpp b/cpp/deeplake_pg/dl_catalog.cpp index 090081b512..9dfa94f555 100644 --- a/cpp/deeplake_pg/dl_catalog.cpp +++ b/cpp/deeplake_pg/dl_catalog.cpp @@ -29,6 +29,7 @@ constexpr const char* k_tables_name = "tables"; constexpr const char* k_columns_name = "columns"; constexpr const char* k_indexes_name = "indexes"; constexpr const char* k_meta_name = "meta"; +constexpr const char* k_databases_name = "databases"; std::string join_path(const std::string& root, const std::string& name) { @@ -131,6 +132,7 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) const auto columns_path = join_path(root_path, k_columns_name); const auto indexes_path = join_path(root_path, k_indexes_name); const auto meta_path = join_path(root_path, k_meta_name); + const auto databases_path = join_path(root_path, k_databases_name); try { // Build schemas for all catalog tables @@ -165,9 +167,20 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) .set_primary_key("catalog_version"); - // Launch all 4 open_or_create operations in parallel + deeplake_api::catalog_table_schema databases_schema; + databases_schema.add("db_name", deeplake_core::type::text(codecs::compression::null)) + .add("owner", deeplake_core::type::text(codecs::compression::null)) + .add("encoding", deeplake_core::type::text(codecs::compression::null)) + .add("lc_collate", deeplake_core::type::text(codecs::compression::null)) + .add("lc_ctype", deeplake_core::type::text(codecs::compression::null)) + .add("template_db", deeplake_core::type::text(codecs::compression::null)) + .add("state", deeplake_core::type::text(codecs::compression::null)) + .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .set_primary_key("db_name"); + + // Launch all 5 open_or_create operations in parallel icm::vector>> promises; - promises.reserve(4); + promises.reserve(5); promises.push_back( deeplake_api::open_or_create_catalog_table(tables_path, std::move(tables_schema), icm::string_map<>(creds))); promises.push_back( @@ -176,12 +189,14 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) deeplake_api::open_or_create_catalog_table(indexes_path, std::move(indexes_schema), icm::string_map<>(creds))); promises.push_back( deeplake_api::open_or_create_catalog_table(meta_path, std::move(meta_schema), icm::string_map<>(creds))); + promises.push_back( + deeplake_api::open_or_create_catalog_table(databases_path, std::move(databases_schema), icm::string_map<>(creds))); // Wait for all to complete auto results = async::combine(std::move(promises)).get_future().get(); - if (results.size() != 4) { + if (results.size() != 5) { elog(ERROR, - "Failed to initialize catalog at %s: expected 4 catalog tables, got %zu", + "Failed to initialize catalog at %s: expected 5 catalog tables, got %zu", root_path.c_str(), static_cast(results.size())); } @@ -499,6 +514,83 @@ void upsert_columns(const std::string& root_path, icm::string_map<> creds, const table->upsert_many(std::move(rows)).get_future().get(); } +std::vector load_databases(const std::string& root_path, icm::string_map<> creds) +{ + std::vector out; + try { + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + if (!table) { + return out; + } + auto snapshot = table->read().get_future().get(); + if (snapshot.row_count() == 0) { + return out; + } + + std::unordered_map latest; + for (const auto& row : snapshot.rows()) { + auto db_name_it = row.find("db_name"); + auto owner_it = row.find("owner"); + auto encoding_it = row.find("encoding"); + auto lc_collate_it = row.find("lc_collate"); + auto lc_ctype_it = row.find("lc_ctype"); + auto template_it = row.find("template_db"); + auto state_it = row.find("state"); + auto updated_it = row.find("updated_at"); + if (db_name_it == row.end() || state_it == row.end()) { + continue; + } + + database_meta meta; + meta.db_name = deeplake_api::array_to_string(db_name_it->second); + if (owner_it != row.end()) meta.owner = deeplake_api::array_to_string(owner_it->second); + if (encoding_it != row.end()) meta.encoding = deeplake_api::array_to_string(encoding_it->second); + if (lc_collate_it != row.end()) meta.lc_collate = deeplake_api::array_to_string(lc_collate_it->second); + if (lc_ctype_it != row.end()) meta.lc_ctype = deeplake_api::array_to_string(lc_ctype_it->second); + if (template_it != row.end()) meta.template_db = deeplake_api::array_to_string(template_it->second); + meta.state = deeplake_api::array_to_string(state_it->second); + if (updated_it != row.end()) { + auto updated_vec = load_int64_vector(updated_it->second); + meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); + } + + auto it = latest.find(meta.db_name); + if (it == latest.end() || it->second.updated_at <= meta.updated_at) { + latest[meta.db_name] = std::move(meta); + } + } + + out.reserve(latest.size()); + for (auto& [_, meta] : latest) { + if (meta.state == "ready") { + out.push_back(std::move(meta)); + } + } + return out; + } catch (const std::exception& e) { + elog(WARNING, "Failed to load catalog databases: %s", e.what()); + return out; + } catch (...) { + elog(WARNING, "Failed to load catalog databases: unknown error"); + return out; + } +} + +void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta) +{ + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + icm::string_map row; + row["db_name"] = nd::adapt(meta.db_name); + row["owner"] = nd::adapt(meta.owner); + row["encoding"] = nd::adapt(meta.encoding); + row["lc_collate"] = nd::adapt(meta.lc_collate); + row["lc_ctype"] = nd::adapt(meta.lc_ctype); + row["template_db"] = nd::adapt(meta.template_db); + row["state"] = nd::adapt(meta.state); + row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); + table->upsert(std::move(row)).get_future().get(); +} + int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds) { try { diff --git a/cpp/deeplake_pg/dl_catalog.hpp b/cpp/deeplake_pg/dl_catalog.hpp index 1d1556eb38..52d680242e 100644 --- a/cpp/deeplake_pg/dl_catalog.hpp +++ b/cpp/deeplake_pg/dl_catalog.hpp @@ -36,6 +36,18 @@ struct index_meta int32_t order_type = 0; }; +struct database_meta +{ + std::string db_name; // PK + std::string owner; + std::string encoding; + std::string lc_collate; + std::string lc_ctype; + std::string template_db; + std::string state; // "ready" or "dropping" + int64_t updated_at = 0; +}; + int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds); std::vector load_tables(const std::string& root_path, icm::string_map<> creds); @@ -49,6 +61,9 @@ load_tables_and_columns(const std::string& root_path, icm::string_map<> creds); void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta); void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector& columns); +std::vector load_databases(const std::string& root_path, icm::string_map<> creds); +void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta); + int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds); void bump_catalog_version(const std::string& root_path, icm::string_map<> creds); diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index f3ac72a1eb..b293f7e819 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -25,6 +25,7 @@ extern "C" { #include "column_statistics.hpp" #include "deeplake_executor.hpp" +#include "dl_catalog.hpp" #include "pg_deeplake.hpp" #include "pg_version_compat.h" #include "sync_worker.hpp" @@ -473,6 +474,10 @@ static void deeplake_shmem_request() // Request shared memory for table DDL lock RequestAddinShmemSpace(pg::table_ddl_lock::get_shmem_size()); RequestNamedLWLockTranche("deeplake_table_ddl", 1); + + // Request shared memory for pending extension install queue + RequestAddinShmemSpace(pg::pending_install_queue::get_shmem_size()); + RequestNamedLWLockTranche("deeplake_install_queue", 1); } static void deeplake_shmem_startup() @@ -483,6 +488,7 @@ static void deeplake_shmem_startup() pg::table_version_tracker::initialize(); pg::table_ddl_lock::initialize(); + pg::pending_install_queue::initialize(); } static void process_utility(PlannedStmt* pstmt, @@ -675,12 +681,83 @@ static void process_utility(PlannedStmt* pstmt, } } + // Pre-hook: mark database as "dropping" in S3 catalog before PostgreSQL drops it + if (IsA(pstmt->utilityStmt, DropdbStmt) && pg::stateless_enabled) { + DropdbStmt* dbstmt = (DropdbStmt*)pstmt->utilityStmt; + try { + auto root_path = pg::session_credentials::get_root_path(); + if (root_path.empty()) { + root_path = pg::utils::get_deeplake_root_directory(); + } + if (!root_path.empty()) { + auto creds = pg::session_credentials::get_credentials(); + pg::dl_catalog::database_meta db_meta; + db_meta.db_name = dbstmt->dbname; + db_meta.state = "dropping"; + pg::dl_catalog::upsert_database(root_path, creds, db_meta); + pg::dl_catalog::bump_catalog_version(root_path, creds); + elog(LOG, "pg_deeplake: marked database '%s' as dropping in catalog", dbstmt->dbname); + } + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake: failed to mark database '%s' as dropping in catalog: %s", dbstmt->dbname, e.what()); + } + } + if (prev_process_utility_hook != nullptr) { prev_process_utility_hook(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, completionTag); } else { standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, completionTag); } + // Post-hook: record CREATE DATABASE in S3 catalog and install extension + if (IsA(pstmt->utilityStmt, CreatedbStmt)) { + CreatedbStmt* dbstmt = (CreatedbStmt*)pstmt->utilityStmt; + + // Queue the database for async extension install by the sync worker. + // The inline PQconnectdb approach fails on PG15+ because CREATE DATABASE + // is WAL-logged/transactional and the pg_database row isn't committed yet. + pg::pending_install_queue::enqueue(dbstmt->dbname); + + // Record in S3 catalog if stateless mode is enabled + if (pg::stateless_enabled) { + try { + auto root_path = pg::session_credentials::get_root_path(); + if (root_path.empty()) { + root_path = pg::utils::get_deeplake_root_directory(); + } + if (!root_path.empty()) { + auto creds = pg::session_credentials::get_credentials(); + pg::dl_catalog::database_meta db_meta; + db_meta.db_name = dbstmt->dbname; + db_meta.state = "ready"; + + // Extract options from CREATE DATABASE statement + ListCell* lc = nullptr; + foreach (lc, dbstmt->options) { + DefElem* def = (DefElem*)lfirst(lc); + if (strcmp(def->defname, "owner") == 0) { + db_meta.owner = defGetString(def); + } else if (strcmp(def->defname, "encoding") == 0) { + db_meta.encoding = defGetString(def); + } else if (strcmp(def->defname, "lc_collate") == 0) { + db_meta.lc_collate = defGetString(def); + } else if (strcmp(def->defname, "lc_ctype") == 0) { + db_meta.lc_ctype = defGetString(def); + } else if (strcmp(def->defname, "template") == 0) { + db_meta.template_db = defGetString(def); + } + } + + pg::dl_catalog::upsert_database(root_path, creds, db_meta); + pg::dl_catalog::bump_catalog_version(root_path, creds); + elog(DEBUG1, "pg_deeplake: recorded CREATE DATABASE '%s' in catalog", dbstmt->dbname); + } + } catch (const std::exception& e) { + elog(DEBUG1, "pg_deeplake: failed to record CREATE DATABASE '%s' in catalog: %s", dbstmt->dbname, e.what()); + } + } + } + // Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset if (IsA(pstmt->utilityStmt, AlterTableStmt)) { AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt; diff --git a/cpp/deeplake_pg/sync_worker.cpp b/cpp/deeplake_pg/sync_worker.cpp index 2587d169ee..051d52fc5f 100644 --- a/cpp/deeplake_pg/sync_worker.cpp +++ b/cpp/deeplake_pg/sync_worker.cpp @@ -9,6 +9,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -32,11 +33,96 @@ extern "C" { #include "utils.hpp" #include +#include #include // GUC variables int deeplake_sync_interval_ms = 2000; // Default 2 seconds +// Forward declaration (defined in the anonymous namespace below) +namespace { bool execute_via_libpq(const char* dbname, const char* sql); } + +// ---- pending_install_queue implementation ---- + +namespace pg { + +pending_install_queue::queue_data* pending_install_queue::data_ = nullptr; + +Size pending_install_queue::get_shmem_size() +{ + Size size = MAXALIGN(sizeof(queue_data)); + size = add_size(size, mul_size(MAX_PENDING, sizeof(entry))); + return size; +} + +void pending_install_queue::initialize() +{ + bool found = false; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + data_ = static_cast(ShmemInitStruct( + "deeplake_pending_installs", + get_shmem_size(), + &found + )); + + if (!found) { + data_->lock = &(GetNamedLWLockTranche("deeplake_install_queue")->lock); + data_->count = 0; + memset(data_->entries, 0, MAX_PENDING * sizeof(entry)); + } + + LWLockRelease(AddinShmemInitLock); +} + +bool pending_install_queue::enqueue(const char* dbname) +{ + if (data_ == nullptr || dbname == nullptr) { + return false; + } + + LWLockAcquire(data_->lock, LW_EXCLUSIVE); + + bool ok = false; + if (data_->count < MAX_PENDING) { + strlcpy(data_->entries[data_->count].db_name, dbname, NAMEDATALEN); + data_->count++; + ok = true; + } + + LWLockRelease(data_->lock); + return ok; +} + +void pending_install_queue::drain_and_install() +{ + if (data_ == nullptr) { + return; + } + + // Copy entries under lock, then release before doing I/O + std::vector pending; + + LWLockAcquire(data_->lock, LW_EXCLUSIVE); + for (int32_t i = 0; i < data_->count; i++) { + pending.emplace_back(data_->entries[i].db_name); + } + data_->count = 0; + LWLockRelease(data_->lock); + + // Install extension via libpq (outside any lock) + for (const auto& db : pending) { + if (execute_via_libpq(db.c_str(), "CREATE EXTENSION IF NOT EXISTS pg_deeplake")) { + elog(LOG, "pg_deeplake: installed extension in database '%s' (async)", db.c_str()); + } else { + elog(WARNING, "pg_deeplake: failed to install extension in database '%s' (async)", db.c_str()); + } + } +} + +} // namespace pg + namespace { // Worker state - use sig_atomic_t for signal safety @@ -59,6 +145,165 @@ void deeplake_sync_worker_sighup(SIGNAL_ARGS) errno = save_errno; } +/** + * Execute SQL via libpq in autocommit mode (needed for CREATE DATABASE which + * cannot run inside a transaction block). + * + * Returns true on success. Treats SQLSTATE 42P04 ("duplicate_database") as success. + */ +bool execute_via_libpq(const char* dbname, const char* sql) +{ + // Build connection string using Unix socket + const char* port = GetConfigOption("port", true, false); + const char* socket_dir = GetConfigOption("unix_socket_directories", true, false); + + StringInfoData conninfo; + initStringInfo(&conninfo); + appendStringInfo(&conninfo, "dbname=%s", dbname); + if (port) { + appendStringInfo(&conninfo, " port=%s", port); + } + if (socket_dir) { + // unix_socket_directories may be comma-separated; use the first one + char* dir_copy = pstrdup(socket_dir); + char* comma = strchr(dir_copy, ','); + if (comma) *comma = '\0'; + // Trim leading/trailing whitespace + char* dir = dir_copy; + while (*dir == ' ') dir++; + appendStringInfo(&conninfo, " host=%s", dir); + pfree(dir_copy); + } + + PGconn* conn = PQconnectdb(conninfo.data); + pfree(conninfo.data); + + if (PQstatus(conn) != CONNECTION_OK) { + elog(WARNING, "pg_deeplake sync: libpq connection failed: %s", PQerrorMessage(conn)); + PQfinish(conn); + return false; + } + + PGresult* res = PQexec(conn, sql); + ExecStatusType status = PQresultStatus(res); + bool ok = (status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK); + + if (!ok) { + const char* sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + // 42P04 = duplicate_database - treat as success (idempotent) + if (sqlstate && strcmp(sqlstate, "42P04") == 0) { + ok = true; + } else { + elog(WARNING, "pg_deeplake sync: libpq exec failed: %s", PQerrorMessage(conn)); + } + } + + PQclear(res); + PQfinish(conn); + return ok; +} + +/** + * Sync databases from the deeplake catalog to PostgreSQL. + * + * Creates missing databases and installs pg_deeplake extension in each. + * Must be called OUTSIDE a transaction context since CREATE DATABASE + * cannot run inside a transaction block. + */ +void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::string_map<> creds) +{ + auto catalog_databases = pg::dl_catalog::load_databases(root_path, creds); + + for (const auto& db : catalog_databases) { + // Skip system databases + if (db.db_name == "postgres" || db.db_name == "template0" || db.db_name == "template1") { + continue; + } + + // Check if database already exists in pg_database via libpq + // (we're outside a transaction, so use libpq to query postgres) + StringInfoData check_sql; + initStringInfo(&check_sql); + appendStringInfo(&check_sql, + "SELECT 1 FROM pg_database WHERE datname = '%s'", + db.db_name.c_str()); + + PGconn* conn = nullptr; + { + const char* port = GetConfigOption("port", true, false); + const char* socket_dir = GetConfigOption("unix_socket_directories", true, false); + + StringInfoData conninfo; + initStringInfo(&conninfo); + appendStringInfo(&conninfo, "dbname=postgres"); + if (port) appendStringInfo(&conninfo, " port=%s", port); + if (socket_dir) { + char* dir_copy = pstrdup(socket_dir); + char* comma = strchr(dir_copy, ','); + if (comma) *comma = '\0'; + char* dir = dir_copy; + while (*dir == ' ') dir++; + appendStringInfo(&conninfo, " host=%s", dir); + pfree(dir_copy); + } + conn = PQconnectdb(conninfo.data); + pfree(conninfo.data); + } + + if (PQstatus(conn) != CONNECTION_OK) { + elog(WARNING, "pg_deeplake sync: cannot check database existence: %s", PQerrorMessage(conn)); + PQfinish(conn); + pfree(check_sql.data); + continue; + } + + PGresult* res = PQexec(conn, check_sql.data); + pfree(check_sql.data); + bool exists = (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) > 0); + PQclear(res); + PQfinish(conn); + + if (exists) { + continue; + } + + // Build CREATE DATABASE statement + StringInfoData create_sql; + initStringInfo(&create_sql); + appendStringInfo(&create_sql, "CREATE DATABASE %s", quote_identifier(db.db_name.c_str())); + if (!db.owner.empty()) { + appendStringInfo(&create_sql, " OWNER %s", quote_identifier(db.owner.c_str())); + } + if (!db.encoding.empty()) { + appendStringInfo(&create_sql, " ENCODING '%s'", db.encoding.c_str()); + } + if (!db.lc_collate.empty()) { + appendStringInfo(&create_sql, " LC_COLLATE '%s'", db.lc_collate.c_str()); + } + if (!db.lc_ctype.empty()) { + appendStringInfo(&create_sql, " LC_CTYPE '%s'", db.lc_ctype.c_str()); + } + if (!db.template_db.empty()) { + appendStringInfo(&create_sql, " TEMPLATE %s", quote_identifier(db.template_db.c_str())); + } + + if (execute_via_libpq("postgres", create_sql.data)) { + elog(LOG, "pg_deeplake sync: created database '%s'", db.db_name.c_str()); + + // Install pg_deeplake extension in the new database + if (execute_via_libpq(db.db_name.c_str(), "CREATE EXTENSION IF NOT EXISTS pg_deeplake")) { + elog(LOG, "pg_deeplake sync: installed extension in database '%s'", db.db_name.c_str()); + } else { + elog(WARNING, "pg_deeplake sync: failed to install extension in database '%s'", db.db_name.c_str()); + } + } else { + elog(WARNING, "pg_deeplake sync: failed to create database '%s'", db.db_name.c_str()); + } + + pfree(create_sql.data); + } +} + /** * Sync tables from the deeplake catalog to PostgreSQL. * @@ -179,12 +424,21 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) while (!got_sigterm) { + // Process pending interrupts (including ProcSignalBarrier from DROP DATABASE) + CHECK_FOR_INTERRUPTS(); + // Handle SIGHUP - reload configuration if (got_sighup) { got_sighup = false; ProcessConfigFile(PGC_SIGHUP); } + // Variables to carry state across transaction boundaries + // (declared before goto target to avoid crossing initialization) + std::string sync_root_path; + icm::string_map<> sync_creds; + bool need_sync = false; + // Skip if stateless mode is disabled if (!pg::stateless_enabled) { goto wait_for_latch; @@ -221,10 +475,11 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) int64_t current_version = pg::dl_catalog::get_catalog_version(root_path, creds); if (current_version != last_catalog_version) { - // Version changed - sync tables from catalog - deeplake_sync_tables_from_catalog(root_path, creds); + // Save state for database sync (which happens outside transaction) + sync_root_path = root_path; + sync_creds = creds; + need_sync = true; last_catalog_version = current_version; - elog(LOG, "pg_deeplake sync: synced tables (catalog version %ld)", current_version); } } } @@ -238,8 +493,44 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) PopActiveSnapshot(); CommitTransactionCommand(); + + // Sync databases via libpq OUTSIDE transaction context + // (CREATE DATABASE cannot run inside a transaction block) + if (need_sync && !sync_root_path.empty()) { + try { + deeplake_sync_databases_from_catalog(sync_root_path, sync_creds); + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake sync: database sync failed: %s", e.what()); + } catch (...) { + elog(WARNING, "pg_deeplake sync: database sync failed: unknown error"); + } + + // Re-enter transaction for table sync + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + PG_TRY(); + { + deeplake_sync_tables_from_catalog(sync_root_path, sync_creds); + elog(DEBUG1, "pg_deeplake sync: synced (catalog version %ld)", last_catalog_version); + } + PG_CATCH(); + { + EmitErrorReport(); + FlushErrorState(); + } + PG_END_TRY(); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + pgstat_report_stat(true); + // Drain any databases queued for async extension install + pg::pending_install_queue::drain_and_install(); + wait_for_latch: // Wait for latch or timeout (void)WaitLatch(MyLatch, diff --git a/cpp/deeplake_pg/sync_worker.hpp b/cpp/deeplake_pg/sync_worker.hpp index b598672479..6705b11a16 100644 --- a/cpp/deeplake_pg/sync_worker.hpp +++ b/cpp/deeplake_pg/sync_worker.hpp @@ -1,4 +1,45 @@ #pragma once +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include + +#ifdef __cplusplus +} +#endif + +#include + // GUC variables for sync worker configuration extern int deeplake_sync_interval_ms; + +namespace pg { + +class pending_install_queue { +public: + static Size get_shmem_size(); + static void initialize(); + static bool enqueue(const char* dbname); + static void drain_and_install(); + +private: + static constexpr int32_t MAX_PENDING = 64; + + struct entry { + char db_name[NAMEDATALEN]; + }; + + struct queue_data { + LWLock* lock; + int32_t count; + entry entries[FLEXIBLE_ARRAY_MEMBER]; + }; + + static queue_data* data_; +}; + +} // namespace pg From 0487a07bf89bbdf574c0946e14092103c745489e Mon Sep 17 00:00:00 2001 From: khustup2 Date: Thu, 12 Feb 2026 00:40:51 +0000 Subject: [PATCH 2/3] Added test. --- .../tests/py_tests/test_create_database.py | 479 ++++++++++++++++++ 1 file changed, 479 insertions(+) create mode 100644 postgres/tests/py_tests/test_create_database.py diff --git a/postgres/tests/py_tests/test_create_database.py b/postgres/tests/py_tests/test_create_database.py new file mode 100644 index 0000000000..9c0c20f6a6 --- /dev/null +++ b/postgres/tests/py_tests/test_create_database.py @@ -0,0 +1,479 @@ +""" +Test CREATE DATABASE with pg_deeplake extension. + +Verifies that pg_deeplake can be installed and used in newly created databases, +and that DROP DATABASE works cleanly with the extension present. + +The CREATE DATABASE post-hook queues the database name into shared memory, and +the background sync worker installs pg_deeplake on its next poll cycle (default +2s). Tests that verify this async behaviour poll pg_extension without a manual +fallback and assert the extension appears within a bounded timeout. + +Cross-instance tests verify that database entries written to the shared catalog +by one instance are picked up by the sync worker on a second instance, which +then creates the database locally and installs the extension. +""" +import asyncio +import os +import shutil +import subprocess +import tempfile +import time +import pytest +import asyncpg +from pathlib import Path + + +SECOND_INSTANCE_PORT = 5434 + + +async def connect_postgres(port=5432): + """Connect to the default postgres database.""" + user = os.environ.get("USER", "postgres") + return await asyncpg.connect( + database="postgres", + user=user, + host="localhost", + port=port, + statement_cache_size=0, + ) + + +async def connect_database(dbname, port=5432): + """Connect to a specific database.""" + user = os.environ.get("USER", "postgres") + return await asyncpg.connect( + database=dbname, + user=user, + host="localhost", + port=port, + statement_cache_size=0, + ) + + +async def ensure_extension(conn, timeout=5.0): + """Wait for pg_deeplake extension (async install by sync worker), installing manually if timeout expires.""" + import asyncio + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + ext = await conn.fetchval( + "SELECT extname FROM pg_extension WHERE extname = 'pg_deeplake'" + ) + if ext == "pg_deeplake": + return + await asyncio.sleep(0.3) + # Fallback: install manually if the sync worker hasn't picked it up yet + await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") + + +@pytest.mark.asyncio +async def test_create_database_auto_installs_extension(pg_server): + """ + Verify that pg_deeplake extension can be installed in a new database. + + CREATE DATABASE followed by CREATE EXTENSION IF NOT EXISTS. + The hook may auto-install the extension; if not, we install it manually. + """ + conn = await connect_postgres() + target_conn = None + try: + await conn.execute("DROP DATABASE IF EXISTS test_auto_ext_db") + await conn.execute("CREATE DATABASE test_auto_ext_db") + + target_conn = await connect_database("test_auto_ext_db") + await ensure_extension(target_conn) + + ext = await target_conn.fetchval( + "SELECT extname FROM pg_extension WHERE extname = 'pg_deeplake'" + ) + assert ext == "pg_deeplake", ( + f"Expected pg_deeplake extension to be installed, got: {ext}" + ) + finally: + if target_conn is not None: + await target_conn.close() + await conn.execute("DROP DATABASE IF EXISTS test_auto_ext_db") + await conn.close() + + +@pytest.mark.asyncio +async def test_create_database_deeplake_works(pg_server): + """ + Verify that deeplake storage works in a newly created database. + + Steps: + - CREATE DATABASE test_dl_works_db + - Connect and ensure extension is installed + - CREATE TABLE with USING deeplake + - INSERT rows, verify row count + - Cleanup + """ + conn = await connect_postgres() + target_conn = None + try: + await conn.execute("DROP DATABASE IF EXISTS test_dl_works_db") + await conn.execute("CREATE DATABASE test_dl_works_db") + + target_conn = await connect_database("test_dl_works_db") + await ensure_extension(target_conn) + + await target_conn.execute(""" + CREATE TABLE test_vectors ( + id SERIAL PRIMARY KEY, + v1 float4[] + ) USING deeplake + """) + + await target_conn.execute(""" + INSERT INTO test_vectors (v1) VALUES + (ARRAY[1.0, 2.0, 3.0]), + (ARRAY[4.0, 5.0, 6.0]), + (ARRAY[7.0, 8.0, 9.0]) + """) + + count = await target_conn.fetchval("SELECT count(*) FROM test_vectors") + assert count == 3, f"Expected 3 rows, got {count}" + + await target_conn.execute("DROP TABLE test_vectors") + finally: + if target_conn is not None: + await target_conn.close() + await conn.execute("DROP DATABASE IF EXISTS test_dl_works_db") + await conn.close() + + +@pytest.mark.asyncio +async def test_drop_database_with_extension(pg_server): + """ + Verify that DROP DATABASE succeeds on a database with pg_deeplake installed. + + Steps: + - CREATE DATABASE test_drop_db + - Connect, verify extension exists, disconnect + - DROP DATABASE -- should succeed without errors + """ + conn = await connect_postgres() + try: + await conn.execute("DROP DATABASE IF EXISTS test_drop_db") + await conn.execute("CREATE DATABASE test_drop_db") + + target_conn = await connect_database("test_drop_db") + await ensure_extension(target_conn) + + ext = await target_conn.fetchval( + "SELECT extname FROM pg_extension WHERE extname = 'pg_deeplake'" + ) + assert ext == "pg_deeplake" + await target_conn.close() + + await conn.execute("DROP DATABASE test_drop_db") + + # Verify database no longer exists + exists = await conn.fetchval( + "SELECT 1 FROM pg_database WHERE datname = 'test_drop_db'" + ) + assert exists is None, "Database should not exist after DROP" + finally: + await conn.execute("DROP DATABASE IF EXISTS test_drop_db") + await conn.close() + + +@pytest.mark.asyncio +async def test_create_multiple_databases(pg_server): + """ + Verify that pg_deeplake works across multiple newly created databases. + + Steps: + - CREATE DATABASE test_multi_db_1 and test_multi_db_2 + - Verify both have the extension installed + - Cleanup: DROP both + """ + conn = await connect_postgres() + conn1 = None + conn2 = None + try: + await conn.execute("DROP DATABASE IF EXISTS test_multi_db_1") + await conn.execute("DROP DATABASE IF EXISTS test_multi_db_2") + await conn.execute("CREATE DATABASE test_multi_db_1") + await conn.execute("CREATE DATABASE test_multi_db_2") + + conn1 = await connect_database("test_multi_db_1") + conn2 = await connect_database("test_multi_db_2") + + await ensure_extension(conn1) + await ensure_extension(conn2) + + ext1 = await conn1.fetchval( + "SELECT extname FROM pg_extension WHERE extname = 'pg_deeplake'" + ) + ext2 = await conn2.fetchval( + "SELECT extname FROM pg_extension WHERE extname = 'pg_deeplake'" + ) + + assert ext1 == "pg_deeplake", ( + f"Expected pg_deeplake in test_multi_db_1, got: {ext1}" + ) + assert ext2 == "pg_deeplake", ( + f"Expected pg_deeplake in test_multi_db_2, got: {ext2}" + ) + finally: + if conn1 is not None: + await conn1.close() + if conn2 is not None: + await conn2.close() + await conn.execute("DROP DATABASE IF EXISTS test_multi_db_1") + await conn.execute("DROP DATABASE IF EXISTS test_multi_db_2") + await conn.close() + + +# --------------------------------------------------------------------------- +# Async auto-install tests (no manual fallback — proves the sync worker path) +# --------------------------------------------------------------------------- + + +async def poll_for_extension(conn, timeout=10.0): + """Poll for pg_deeplake extension. Returns True if found, False on timeout.""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + ext = await conn.fetchval( + "SELECT extname FROM pg_extension WHERE extname = 'pg_deeplake'" + ) + if ext == "pg_deeplake": + return True + await asyncio.sleep(0.25) + return False + + +@pytest.mark.asyncio +async def test_async_extension_auto_install(pg_server): + """ + Verify the sync worker installs pg_deeplake asynchronously without manual + intervention. + + After CREATE DATABASE, the post-hook queues the database name into shared + memory. The background sync worker drains the queue and installs the + extension via libpq. This test polls WITHOUT manual fallback — if the + extension doesn't appear within the timeout, the test FAILS. + """ + conn = await connect_postgres() + target_conn = None + try: + await conn.execute("DROP DATABASE IF EXISTS test_async_install_db") + await conn.execute("CREATE DATABASE test_async_install_db") + + target_conn = await connect_database("test_async_install_db") + + installed = await poll_for_extension(target_conn) + assert installed, ( + "Sync worker should have auto-installed pg_deeplake within 10s" + ) + finally: + if target_conn is not None: + await target_conn.close() + await conn.execute("DROP DATABASE IF EXISTS test_async_install_db") + await conn.close() + + +@pytest.mark.asyncio +async def test_async_extension_auto_install_multiple(pg_server): + """ + Verify that the shared-memory queue handles multiple databases queued in + rapid succession. All should have the extension installed by the sync + worker within the timeout — no manual fallback. + """ + conn = await connect_postgres() + db_names = ["test_async_batch_1", "test_async_batch_2", "test_async_batch_3"] + conns = {} + try: + for db in db_names: + await conn.execute(f"DROP DATABASE IF EXISTS {db}") + for db in db_names: + await conn.execute(f"CREATE DATABASE {db}") + + for db in db_names: + conns[db] = await connect_database(db) + + # Poll all databases for extension + remaining = set(db_names) + deadline = asyncio.get_event_loop().time() + 10.0 + while remaining and asyncio.get_event_loop().time() < deadline: + for db in list(remaining): + ext = await conns[db].fetchval( + "SELECT extname FROM pg_extension WHERE extname = 'pg_deeplake'" + ) + if ext == "pg_deeplake": + remaining.discard(db) + if remaining: + await asyncio.sleep(0.25) + + assert not remaining, ( + f"Sync worker should have installed pg_deeplake in all databases " + f"within 10s, still missing: {remaining}" + ) + finally: + for c in conns.values(): + await c.close() + for db in db_names: + await conn.execute(f"DROP DATABASE IF EXISTS {db}") + await conn.close() + + +# --------------------------------------------------------------------------- +# Cross-instance database creation via catalog sync +# --------------------------------------------------------------------------- + + +def _run_cmd(cmd, check=True): + """Run a shell command, handling root vs non-root.""" + user = os.environ.get("USER", "postgres") + if os.geteuid() == 0: + result = subprocess.run( + ["su", "-", user, "-c", cmd], + capture_output=True, text=True, + ) + else: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + if check and result.returncode != 0: + raise RuntimeError(f"Command failed: {cmd}\nstderr: {result.stderr}") + return result + + +@pytest.fixture +def second_pg_instance(pg_config, temp_dir_for_postgres): + """ + Start a second PostgreSQL instance with deeplake.root_path configured in + postgresql.conf so that its sync worker can discover databases from the + shared catalog. + """ + install_dir = pg_config["install"] + pg_ctl = install_dir / "bin" / "pg_ctl" + initdb = install_dir / "bin" / "initdb" + user = os.environ.get("USER", "postgres") + + tmp = Path(tempfile.mkdtemp(prefix="deeplake_second_")) + if os.geteuid() == 0: + shutil.chown(str(tmp), user=user, group=user) + os.chmod(tmp, 0o777) + + data_dir = tmp / "pg_data" + log_file = tmp / "server.log" + + # initdb + _run_cmd(f"{initdb} -D {data_dir} -U {user}") + + # Configure + with open(data_dir / "postgresql.conf", "a") as f: + f.write(f"\nport = {SECOND_INSTANCE_PORT}\n") + f.write("shared_preload_libraries = 'pg_deeplake'\n") + f.write("max_connections = 100\n") + f.write("shared_buffers = 64MB\n") + f.write(f"deeplake.root_path = '{temp_dir_for_postgres}'\n") + + # Start + lib_path = str(install_dir / "lib") + ld = f"{lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}" + if os.geteuid() == 0: + subprocess.run( + ["su", "-", user, "-c", + f"LD_LIBRARY_PATH={ld} {pg_ctl} -D {data_dir} -l {log_file} start"], + check=True, + ) + else: + env = os.environ.copy() + env["LD_LIBRARY_PATH"] = ld + subprocess.run( + [str(pg_ctl), "-D", str(data_dir), "-l", str(log_file), "start"], + check=True, env=env, + ) + time.sleep(3) + + yield { + "port": SECOND_INSTANCE_PORT, + "root_path": temp_dir_for_postgres, + "data_dir": data_dir, + "log_file": log_file, + } + + # Cleanup + _run_cmd(f"{pg_ctl} stop -D {data_dir} -m fast", check=False) + time.sleep(1) + if tmp.exists(): + shutil.rmtree(tmp) + + +@pytest.mark.asyncio +async def test_database_creation_synced_to_second_instance( + pg_server, second_pg_instance, temp_dir_for_postgres, +): + """ + Verify cross-instance database creation via the shared catalog. + + Instance A (primary, port 5432): + - SET deeplake.root_path → CREATE DATABASE → catalog records the DB. + + Instance B (secondary, port 5434): + - Has the same deeplake.root_path in postgresql.conf. + - Its sync worker reads the catalog, discovers the new database, creates + it locally via CREATE DATABASE, and installs pg_deeplake. + + The test polls Instance B for the database and extension without manual + intervention. + """ + root_path = temp_dir_for_postgres + port_b = second_pg_instance["port"] + db_name = "test_cross_instance_db" + + # --- Instance A: create the database with catalog recording --- + conn_a = await connect_postgres() + try: + await conn_a.execute(f"SET deeplake.root_path = '{root_path}'") + await conn_a.execute(f"DROP DATABASE IF EXISTS {db_name}") + await conn_a.execute(f"CREATE DATABASE {db_name}") + finally: + await conn_a.close() + + # --- Instance B: wait for the sync worker to create the database --- + deadline = asyncio.get_event_loop().time() + 15.0 + db_exists = False + while asyncio.get_event_loop().time() < deadline: + try: + conn_b = await connect_postgres(port=port_b) + exists = await conn_b.fetchval( + "SELECT 1 FROM pg_database WHERE datname = $1", db_name, + ) + await conn_b.close() + if exists: + db_exists = True + break + except Exception: + pass + await asyncio.sleep(0.5) + + assert db_exists, ( + f"Sync worker on Instance B should have created '{db_name}' " + f"within 15s via catalog sync" + ) + + # --- Instance B: verify the extension was installed in the new DB --- + target_conn = await connect_database(db_name, port=port_b) + try: + installed = await poll_for_extension(target_conn, timeout=10.0) + assert installed, ( + f"pg_deeplake should be auto-installed in '{db_name}' on Instance B" + ) + finally: + await target_conn.close() + + # --- Cleanup on both instances --- + conn_a = await connect_postgres() + try: + await conn_a.execute(f"DROP DATABASE IF EXISTS {db_name}") + finally: + await conn_a.close() + + try: + conn_b = await connect_postgres(port=port_b) + await conn_b.execute(f"DROP DATABASE IF EXISTS {db_name}") + await conn_b.close() + except Exception: + pass From 803bdbeed67744578f478e93c9b6dd98a31c7647 Mon Sep 17 00:00:00 2001 From: khustup2 Date: Thu, 12 Feb 2026 00:41:08 +0000 Subject: [PATCH 3/3] Updated deeplake-api version. --- DEEPLAKE_API_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DEEPLAKE_API_VERSION b/DEEPLAKE_API_VERSION index 4404a17bae..6cedcff630 100644 --- a/DEEPLAKE_API_VERSION +++ b/DEEPLAKE_API_VERSION @@ -1 +1 @@ -4.5.1 +4.5.2