Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 370 files
2 changes: 2 additions & 0 deletions src/include/storage/postgres_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class PostgresCatalog : public Catalog {
PhysicalOperator &plan) override;
PhysicalOperator &PlanUpdate(ClientContext &context, PhysicalPlanGenerator &planner, LogicalUpdate &op,
PhysicalOperator &plan) override;
PhysicalOperator &PlanMergeInto(ClientContext &context, PhysicalPlanGenerator &planner, LogicalMergeInto &op,
PhysicalOperator &plan) override;

unique_ptr<LogicalOperator> BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table,
unique_ptr<LogicalOperator> plan) override;
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/postgres_insert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class PostgresInsert : public PhysicalOperator {
unique_ptr<BoundCreateTableInfo> info;
//! column_index_map
physical_index_vector_t<idx_t> column_index_map;
//! Whether or not we can keep the copy alive during Sink calls
bool keep_copy_alive = true;

public:
// Source interface
Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/postgres_update.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ namespace duckdb {
class PostgresUpdate : public PhysicalOperator {
public:
PostgresUpdate(PhysicalPlan &physical_plan, LogicalOperator &op, TableCatalogEntry &table,
vector<PhysicalIndex> columns);
vector<PhysicalIndex> columns, vector<unique_ptr<Expression>> expressions);

//! The table to delete from
TableCatalogEntry &table;
//! The set of columns to update
vector<PhysicalIndex> columns;
//! Expressions to execute
vector<unique_ptr<Expression>> expressions;
//! Whether or not we can keep the copy alive during Sink calls
bool keep_copy_alive = true;

public:
// Source interface
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ add_library(
postgres_index_entry.cpp
postgres_index_set.cpp
postgres_insert.cpp
postgres_merge_into.cpp
postgres_optimizer.cpp
postgres_schema_entry.cpp
postgres_schema_set.cpp
Expand Down
51 changes: 35 additions & 16 deletions src/storage/postgres_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,25 @@ PostgresInsert::PostgresInsert(PhysicalPlan &physical_plan, LogicalOperator &op,
//===--------------------------------------------------------------------===//
class PostgresInsertGlobalState : public GlobalSinkState {
public:
explicit PostgresInsertGlobalState(ClientContext &context, PostgresTableEntry *table)
: table(table), insert_count(0) {
explicit PostgresInsertGlobalState(ClientContext &context, PostgresTableEntry &table, PostgresCopyFormat format)
: table(table), insert_count(0), format(format) {
}

PostgresTableEntry *table;
PostgresTableEntry &table;
PostgresCopyState copy_state;
DataChunk varchar_chunk;
idx_t insert_count;
PostgresCopyFormat format;
vector<string> insert_column_names;
bool copy_is_active = false;

void FinishCopyTo(PostgresConnection &connection) {
if (!copy_is_active) {
return;
}
connection.FinishCopyTo(copy_state);
copy_is_active = false;
}
};

vector<string> GetInsertColumns(const PostgresInsert &insert, PostgresTableEntry &entry) {
Expand Down Expand Up @@ -68,7 +79,7 @@ vector<string> GetInsertColumns(const PostgresInsert &insert, PostgresTableEntry
}

unique_ptr<GlobalSinkState> PostgresInsert::GetGlobalSinkState(ClientContext &context) const {
PostgresTableEntry *insert_table;
optional_ptr<PostgresTableEntry> insert_table;
if (!table) {
auto &schema_ref = *schema.get_mutable();
insert_table =
Expand All @@ -79,9 +90,9 @@ unique_ptr<GlobalSinkState> PostgresInsert::GetGlobalSinkState(ClientContext &co
auto &transaction = PostgresTransaction::Get(context, insert_table->catalog);
auto &connection = transaction.GetConnection();
auto insert_columns = GetInsertColumns(*this, *insert_table);
auto result = make_uniq<PostgresInsertGlobalState>(context, insert_table);
auto format = insert_table->GetCopyFormat(context);
vector<string> insert_column_names;
auto result = make_uniq<PostgresInsertGlobalState>(context, *insert_table, format);
auto &insert_column_names = result->insert_column_names;
if (!insert_columns.empty()) {
for (auto &str : insert_columns) {
auto index = insert_table->GetColumnIndex(str, true);
Expand All @@ -92,20 +103,28 @@ unique_ptr<GlobalSinkState> PostgresInsert::GetGlobalSinkState(ClientContext &co
}
}
}
connection.BeginCopyTo(context, result->copy_state, format, insert_table->schema.name, insert_table->name,
insert_column_names);
return std::move(result);
}

//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//
SinkResultType PostgresInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
auto &gstate = sink_state->Cast<PostgresInsertGlobalState>();
auto &transaction = PostgresTransaction::Get(context.client, gstate.table->catalog);
auto &gstate = input.global_state.Cast<PostgresInsertGlobalState>();
auto &transaction = PostgresTransaction::Get(context.client, gstate.table.catalog);
auto &connection = transaction.GetConnection();
if (!gstate.copy_is_active) {
// copy hasn't started yet
connection.BeginCopyTo(context.client, gstate.copy_state, gstate.format, gstate.table.schema.name,
gstate.table.name, gstate.insert_column_names);
gstate.copy_is_active = true;
}
connection.CopyChunk(context.client, gstate.copy_state, chunk, gstate.varchar_chunk);
gstate.insert_count += chunk.size();
if (!keep_copy_alive) {
// if we are can't keep the copy alive we need to restart the copy during every sink
gstate.FinishCopyTo(connection);
}
return SinkResultType::NEED_MORE_INPUT;
}

Expand All @@ -114,15 +133,15 @@ SinkResultType PostgresInsert::Sink(ExecutionContext &context, DataChunk &chunk,
//===--------------------------------------------------------------------===//
SinkFinalizeType PostgresInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
OperatorSinkFinalizeInput &input) const {
auto &gstate = sink_state->Cast<PostgresInsertGlobalState>();
auto &transaction = PostgresTransaction::Get(context, gstate.table->catalog);
auto &gstate = input.global_state.Cast<PostgresInsertGlobalState>();
auto &transaction = PostgresTransaction::Get(context, gstate.table.catalog);
auto &connection = transaction.GetConnection();
connection.FinishCopyTo(gstate.copy_state);
gstate.FinishCopyTo(connection);
// update the approx_num_pages - approximately 8 bytes per column per row
idx_t bytes_per_page = 8192;
idx_t bytes_per_row = gstate.table->GetColumns().LogicalColumnCount() * 8;
idx_t bytes_per_row = gstate.table.GetColumns().LogicalColumnCount() * 8;
idx_t rows_per_page = MaxValue<idx_t>(1, bytes_per_page / bytes_per_row);
gstate.table->approx_num_pages += gstate.insert_count / rows_per_page;
gstate.table.approx_num_pages += gstate.insert_count / rows_per_page;
return SinkFinalizeType::READY;
}

Expand Down Expand Up @@ -218,7 +237,7 @@ PhysicalOperator &PostgresCatalog::PlanInsert(ClientContext &context, PhysicalPl
if (op.return_chunk) {
throw BinderException("RETURNING clause not yet supported for insertion into Postgres table");
}
if (op.action_type != OnConflictAction::THROW) {
if (op.on_conflict_info.action_type != OnConflictAction::THROW) {
throw BinderException("ON CONFLICT clause not yet supported for insertion into Postgres table");
}

Expand Down
123 changes: 123 additions & 0 deletions src/storage/postgres_merge_into.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include "storage/postgres_catalog.hpp"
#include "duckdb/execution/operator/persistent/physical_merge_into.hpp"
#include "duckdb/planner/operator/logical_merge_into.hpp"
#include "storage/postgres_update.hpp"
#include "storage/postgres_delete.hpp"
#include "storage/postgres_insert.hpp"
#include "duckdb/planner/operator/logical_update.hpp"
#include "duckdb/planner/operator/logical_dummy_scan.hpp"
#include "duckdb/planner/operator/logical_delete.hpp"
#include "duckdb/planner/operator/logical_insert.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/parallel/thread_context.hpp"
#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"

namespace duckdb {

//===--------------------------------------------------------------------===//
// Plan Merge Into
//===--------------------------------------------------------------------===//
unique_ptr<MergeIntoOperator> PostgresPlanMergeIntoAction(PostgresCatalog &catalog, ClientContext &context,
LogicalMergeInto &op, PhysicalPlanGenerator &planner,
BoundMergeIntoAction &action, PhysicalOperator &child_plan) {
auto result = make_uniq<MergeIntoOperator>();

result->action_type = action.action_type;
result->condition = std::move(action.condition);
vector<unique_ptr<BoundConstraint>> bound_constraints;
for (auto &constraint : op.bound_constraints) {
bound_constraints.push_back(constraint->Copy());
}
auto return_types = op.types;

switch (action.action_type) {
case MergeActionType::MERGE_UPDATE: {
if (action.columns.empty()) {
// not updating any columns
result->action_type = MergeActionType::MERGE_DO_NOTHING;
break;
}
LogicalUpdate update(op.table);
for (auto &def : op.bound_defaults) {
update.bound_defaults.push_back(def->Copy());
}
update.bound_constraints = std::move(bound_constraints);
update.expressions = std::move(action.expressions);
update.columns = std::move(action.columns);
update.update_is_del_and_insert = action.update_is_del_and_insert;
result->op = catalog.PlanUpdate(context, planner, update, child_plan);
auto &pg_update = result->op->Cast<PostgresUpdate>();
pg_update.keep_copy_alive = false;
break;
}
case MergeActionType::MERGE_DELETE: {
LogicalDelete delete_op(op.table, 0);
auto ref = make_uniq<BoundReferenceExpression>(LogicalType::BIGINT, op.row_id_start);
delete_op.expressions.push_back(std::move(ref));
delete_op.bound_constraints = std::move(bound_constraints);
result->op = catalog.PlanDelete(context, planner, delete_op, child_plan);
break;
}
case MergeActionType::MERGE_INSERT: {
LogicalInsert insert_op(op.table, 0);
insert_op.bound_constraints = std::move(bound_constraints);
for (auto &def : op.bound_defaults) {
insert_op.bound_defaults.push_back(def->Copy());
}
// transform expressions if required
if (!action.column_index_map.empty()) {
vector<unique_ptr<Expression>> new_expressions;
for (auto &col : op.table.GetColumns().Physical()) {
auto storage_idx = col.StorageOid();
auto mapped_index = action.column_index_map[col.Physical()];
if (mapped_index == DConstants::INVALID_INDEX) {
// push default value
new_expressions.push_back(op.bound_defaults[storage_idx]->Copy());
} else {
// push reference
new_expressions.push_back(std::move(action.expressions[mapped_index]));
}
}
action.expressions = std::move(new_expressions);
}
result->expressions = std::move(action.expressions);
result->op = catalog.PlanInsert(context, planner, insert_op, child_plan);
// MERGE cannot keep the copy alive because we can interleave with other operations
auto &pg_insert = result->op->Cast<PostgresInsert>();
pg_insert.keep_copy_alive = false;
break;
}
case MergeActionType::MERGE_ERROR:
result->expressions = std::move(action.expressions);
break;
case MergeActionType::MERGE_DO_NOTHING:
break;
default:
throw InternalException("Unsupported merge action");
}
return result;
}

PhysicalOperator &PostgresCatalog::PlanMergeInto(ClientContext &context, PhysicalPlanGenerator &planner,
LogicalMergeInto &op, PhysicalOperator &plan) {
if (op.return_chunk) {
throw NotImplementedException("RETURNING is not implemented for Postgres yet");
}
map<MergeActionCondition, vector<unique_ptr<MergeIntoOperator>>> actions;

// plan the merge into clauses
for (auto &entry : op.actions) {
vector<unique_ptr<MergeIntoOperator>> planned_actions;
for (auto &action : entry.second) {
planned_actions.push_back(PostgresPlanMergeIntoAction(*this, context, op, planner, *action, plan));
}
actions.emplace(entry.first, std::move(planned_actions));
}

auto &result = planner.Make<PhysicalMergeInto>(op.types, std::move(actions), op.row_id_start, op.source_marker,
false, op.return_chunk);
result.children.push_back(plan);
return result;
}

} // namespace duckdb
21 changes: 20 additions & 1 deletion src/storage/postgres_table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "storage/postgres_transaction.hpp"
#include "duckdb/storage/statistics/base_statistics.hpp"
#include "duckdb/storage/table_storage_info.hpp"
#include "duckdb/parser/constraints/unique_constraint.hpp"
#include "postgres_scanner.hpp"

namespace duckdb {
Expand Down Expand Up @@ -68,7 +69,25 @@ TableStorageInfo PostgresTableEntry::GetStorageInfo(ClientContext &context) {
auto &db = transaction.GetConnection();
TableStorageInfo result;
result.cardinality = 0;
result.index_info = db.GetIndexInfo(name);
// get index info based on constraints
for (auto &constraint : constraints) {
if (constraint->type != ConstraintType::UNIQUE) {
continue;
}
IndexInfo info;
auto &unique = constraint->Cast<UniqueConstraint>();
info.is_unique = true;
info.is_primary = unique.IsPrimaryKey();
info.is_foreign = false;
if (unique.HasIndex()) {
info.column_set.insert(unique.GetIndex().index);
} else {
for (auto &name : unique.GetColumnNames()) {
info.column_set.insert(columns.GetColumn(name).Logical().index);
}
}
result.index_info.push_back(std::move(info));
}
return result;
}

Expand Down
Loading
Loading