Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEEPLAKE_API_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.5.1
4.5.2
3 changes: 2 additions & 1 deletion cpp/CMakeLists.pg.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ foreach(PG_VERSION ${PG_VERSIONS})
endif()

set(PG_SERVER_INCLUDE_DIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/include/server")
set(PG_INCLUDE_DIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/include")
set(PG_PKGLIBDIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/lib")
set(PG_SHAREDIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/share")

Expand All @@ -79,7 +80,7 @@ foreach(PG_VERSION ${PG_VERSIONS})
)

target_include_directories(${PG_LIB}
SYSTEM PRIVATE ${PG_SERVER_INCLUDE_DIR}
SYSTEM PRIVATE ${PG_SERVER_INCLUDE_DIR} ${PG_INCLUDE_DIR}
PRIVATE
${indicators_INCLUDE_DIRS}
)
Expand Down
100 changes: 96 additions & 4 deletions cpp/deeplake_pg/dl_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ constexpr const char* k_tables_name = "tables";
constexpr const char* k_columns_name = "columns";
constexpr const char* k_indexes_name = "indexes";
constexpr const char* k_meta_name = "meta";
constexpr const char* k_databases_name = "databases";

std::string join_path(const std::string& root, const std::string& name)
{
Expand Down Expand Up @@ -131,6 +132,7 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
const auto columns_path = join_path(root_path, k_columns_name);
const auto indexes_path = join_path(root_path, k_indexes_name);
const auto meta_path = join_path(root_path, k_meta_name);
const auto databases_path = join_path(root_path, k_databases_name);

try {
// Build schemas for all catalog tables
Expand Down Expand Up @@ -165,9 +167,20 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
.set_primary_key("catalog_version");

// Launch all 4 open_or_create operations in parallel
deeplake_api::catalog_table_schema databases_schema;
databases_schema.add("db_name", deeplake_core::type::text(codecs::compression::null))
.add("owner", deeplake_core::type::text(codecs::compression::null))
.add("encoding", deeplake_core::type::text(codecs::compression::null))
.add("lc_collate", deeplake_core::type::text(codecs::compression::null))
.add("lc_ctype", deeplake_core::type::text(codecs::compression::null))
.add("template_db", deeplake_core::type::text(codecs::compression::null))
.add("state", deeplake_core::type::text(codecs::compression::null))
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
.set_primary_key("db_name");

// Launch all 5 open_or_create operations in parallel
icm::vector<async::promise<std::shared_ptr<deeplake_api::catalog_table>>> promises;
promises.reserve(4);
promises.reserve(5);
promises.push_back(
deeplake_api::open_or_create_catalog_table(tables_path, std::move(tables_schema), icm::string_map<>(creds)));
promises.push_back(
Expand All @@ -176,12 +189,14 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
deeplake_api::open_or_create_catalog_table(indexes_path, std::move(indexes_schema), icm::string_map<>(creds)));
promises.push_back(
deeplake_api::open_or_create_catalog_table(meta_path, std::move(meta_schema), icm::string_map<>(creds)));
promises.push_back(
deeplake_api::open_or_create_catalog_table(databases_path, std::move(databases_schema), icm::string_map<>(creds)));

// Wait for all to complete
auto results = async::combine(std::move(promises)).get_future().get();
if (results.size() != 4) {
if (results.size() != 5) {
elog(ERROR,
"Failed to initialize catalog at %s: expected 4 catalog tables, got %zu",
"Failed to initialize catalog at %s: expected 5 catalog tables, got %zu",
root_path.c_str(),
static_cast<size_t>(results.size()));
}
Expand Down Expand Up @@ -499,6 +514,83 @@ void upsert_columns(const std::string& root_path, icm::string_map<> creds, const
table->upsert_many(std::move(rows)).get_future().get();
}

std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds)
{
std::vector<database_meta> out;
try {
auto table = open_catalog_table(root_path, k_databases_name, std::move(creds));
if (!table) {
return out;
}
auto snapshot = table->read().get_future().get();
if (snapshot.row_count() == 0) {
return out;
}

std::unordered_map<std::string, database_meta> latest;
for (const auto& row : snapshot.rows()) {
auto db_name_it = row.find("db_name");
auto owner_it = row.find("owner");
auto encoding_it = row.find("encoding");
auto lc_collate_it = row.find("lc_collate");
auto lc_ctype_it = row.find("lc_ctype");
auto template_it = row.find("template_db");
auto state_it = row.find("state");
auto updated_it = row.find("updated_at");
if (db_name_it == row.end() || state_it == row.end()) {
continue;
}

database_meta meta;
meta.db_name = deeplake_api::array_to_string(db_name_it->second);
if (owner_it != row.end()) meta.owner = deeplake_api::array_to_string(owner_it->second);
if (encoding_it != row.end()) meta.encoding = deeplake_api::array_to_string(encoding_it->second);
if (lc_collate_it != row.end()) meta.lc_collate = deeplake_api::array_to_string(lc_collate_it->second);
if (lc_ctype_it != row.end()) meta.lc_ctype = deeplake_api::array_to_string(lc_ctype_it->second);
if (template_it != row.end()) meta.template_db = deeplake_api::array_to_string(template_it->second);
meta.state = deeplake_api::array_to_string(state_it->second);
if (updated_it != row.end()) {
auto updated_vec = load_int64_vector(updated_it->second);
meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front();
}

auto it = latest.find(meta.db_name);
if (it == latest.end() || it->second.updated_at <= meta.updated_at) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: updated_at <= meta.updated_at allows ties. Two concurrent upserts with same timestamp result in non-deterministic latest selection (map insertion order dependent).

latest[meta.db_name] = std::move(meta);
}
}

out.reserve(latest.size());
for (auto& [_, meta] : latest) {
if (meta.state == "ready") {
out.push_back(std::move(meta));
}
}
return out;
} catch (const std::exception& e) {
elog(WARNING, "Failed to load catalog databases: %s", e.what());
return out;
} catch (...) {
elog(WARNING, "Failed to load catalog databases: unknown error");
return out;
}
}

void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta)
{
auto table = open_catalog_table(root_path, k_databases_name, std::move(creds));
icm::string_map<nd::array> row;
row["db_name"] = nd::adapt(meta.db_name);
row["owner"] = nd::adapt(meta.owner);
row["encoding"] = nd::adapt(meta.encoding);
row["lc_collate"] = nd::adapt(meta.lc_collate);
row["lc_ctype"] = nd::adapt(meta.lc_ctype);
row["template_db"] = nd::adapt(meta.template_db);
row["state"] = nd::adapt(meta.state);
row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at);
table->upsert(std::move(row)).get_future().get();
}

int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds)
{
try {
Expand Down
15 changes: 15 additions & 0 deletions cpp/deeplake_pg/dl_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ struct index_meta
int32_t order_type = 0;
};

struct database_meta
{
std::string db_name; // PK
std::string owner;
std::string encoding;
std::string lc_collate;
std::string lc_ctype;
std::string template_db;
std::string state; // "ready" or "dropping"
int64_t updated_at = 0;
};

int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds);

std::vector<table_meta> load_tables(const std::string& root_path, icm::string_map<> creds);
Expand All @@ -49,6 +61,9 @@ load_tables_and_columns(const std::string& root_path, icm::string_map<> creds);
void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta);
void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector<column_meta>& columns);

std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds);
void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta);

int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds);
void bump_catalog_version(const std::string& root_path, icm::string_map<> creds);

Expand Down
77 changes: 77 additions & 0 deletions cpp/deeplake_pg/extension_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {

#include "column_statistics.hpp"
#include "deeplake_executor.hpp"
#include "dl_catalog.hpp"
#include "pg_deeplake.hpp"
#include "pg_version_compat.h"
#include "sync_worker.hpp"
Expand Down Expand Up @@ -473,6 +474,10 @@ static void deeplake_shmem_request()
// Request shared memory for table DDL lock
RequestAddinShmemSpace(pg::table_ddl_lock::get_shmem_size());
RequestNamedLWLockTranche("deeplake_table_ddl", 1);

// Request shared memory for pending extension install queue
RequestAddinShmemSpace(pg::pending_install_queue::get_shmem_size());
RequestNamedLWLockTranche("deeplake_install_queue", 1);
}

static void deeplake_shmem_startup()
Expand All @@ -483,6 +488,7 @@ static void deeplake_shmem_startup()

pg::table_version_tracker::initialize();
pg::table_ddl_lock::initialize();
pg::pending_install_queue::initialize();
}

static void process_utility(PlannedStmt* pstmt,
Expand Down Expand Up @@ -675,12 +681,83 @@ static void process_utility(PlannedStmt* pstmt,
}
}

// Pre-hook: mark database as "dropping" in S3 catalog before PostgreSQL drops it
if (IsA(pstmt->utilityStmt, DropdbStmt) && pg::stateless_enabled) {
DropdbStmt* dbstmt = (DropdbStmt*)pstmt->utilityStmt;
try {
auto root_path = pg::session_credentials::get_root_path();
if (root_path.empty()) {
root_path = pg::utils::get_deeplake_root_directory();
}
if (!root_path.empty()) {
auto creds = pg::session_credentials::get_credentials();
pg::dl_catalog::database_meta db_meta;
db_meta.db_name = dbstmt->dbname;
db_meta.state = "dropping";
pg::dl_catalog::upsert_database(root_path, creds, db_meta);
pg::dl_catalog::bump_catalog_version(root_path, creds);
elog(LOG, "pg_deeplake: marked database '%s' as dropping in catalog", dbstmt->dbname);
}
} catch (const std::exception& e) {
elog(WARNING, "pg_deeplake: failed to mark database '%s' as dropping in catalog: %s", dbstmt->dbname, e.what());
}
}

if (prev_process_utility_hook != nullptr) {
prev_process_utility_hook(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, completionTag);
} else {
standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, completionTag);
}

// Post-hook: record CREATE DATABASE in S3 catalog and install extension
if (IsA(pstmt->utilityStmt, CreatedbStmt)) {
CreatedbStmt* dbstmt = (CreatedbStmt*)pstmt->utilityStmt;

// Queue the database for async extension install by the sync worker.
// The inline PQconnectdb approach fails on PG15+ because CREATE DATABASE
// is WAL-logged/transactional and the pg_database row isn't committed yet.
pg::pending_install_queue::enqueue(dbstmt->dbname);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring return value from enqueue. If queue is full, extension install will be silently skipped with no warning to user.

Suggested change
pg::pending_install_queue::enqueue(dbstmt->dbname);
if (!pg::pending_install_queue::enqueue(dbstmt->dbname)) {
elog(WARNING, "pg_deeplake: failed to queue database '%s' for extension install (queue full)", dbstmt->dbname);
}


// Record in S3 catalog if stateless mode is enabled
if (pg::stateless_enabled) {
try {
auto root_path = pg::session_credentials::get_root_path();
if (root_path.empty()) {
root_path = pg::utils::get_deeplake_root_directory();
}
if (!root_path.empty()) {
auto creds = pg::session_credentials::get_credentials();
pg::dl_catalog::database_meta db_meta;
db_meta.db_name = dbstmt->dbname;
db_meta.state = "ready";

// Extract options from CREATE DATABASE statement
ListCell* lc = nullptr;
foreach (lc, dbstmt->options) {
DefElem* def = (DefElem*)lfirst(lc);
if (strcmp(def->defname, "owner") == 0) {
db_meta.owner = defGetString(def);
} else if (strcmp(def->defname, "encoding") == 0) {
db_meta.encoding = defGetString(def);
} else if (strcmp(def->defname, "lc_collate") == 0) {
db_meta.lc_collate = defGetString(def);
} else if (strcmp(def->defname, "lc_ctype") == 0) {
db_meta.lc_ctype = defGetString(def);
} else if (strcmp(def->defname, "template") == 0) {
db_meta.template_db = defGetString(def);
}
}

pg::dl_catalog::upsert_database(root_path, creds, db_meta);
pg::dl_catalog::bump_catalog_version(root_path, creds);
elog(DEBUG1, "pg_deeplake: recorded CREATE DATABASE '%s' in catalog", dbstmt->dbname);
}
} catch (const std::exception& e) {
elog(DEBUG1, "pg_deeplake: failed to record CREATE DATABASE '%s' in catalog: %s", dbstmt->dbname, e.what());
}
}
}

// Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset
if (IsA(pstmt->utilityStmt, AlterTableStmt)) {
AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt;
Expand Down
Loading