diff --git a/duckdb b/duckdb index ff0f95954..482b5702f 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit ff0f95954bdb4c7515481ac2b473261407bad18b +Subproject commit 482b5702f78ee4122612cfe4de6e373e8e1ac963 diff --git a/src/include/storage/postgres_catalog.hpp b/src/include/storage/postgres_catalog.hpp index a49f31b9f..d6f273a9a 100644 --- a/src/include/storage/postgres_catalog.hpp +++ b/src/include/storage/postgres_catalog.hpp @@ -55,6 +55,8 @@ class PostgresCatalog : public Catalog { PhysicalOperator &plan) override; PhysicalOperator &PlanUpdate(ClientContext &context, PhysicalPlanGenerator &planner, LogicalUpdate &op, PhysicalOperator &plan) override; + PhysicalOperator &PlanMergeInto(ClientContext &context, PhysicalPlanGenerator &planner, LogicalMergeInto &op, + PhysicalOperator &plan) override; unique_ptr BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, unique_ptr plan) override; diff --git a/src/include/storage/postgres_insert.hpp b/src/include/storage/postgres_insert.hpp index 7dc301a3b..d22b4f195 100644 --- a/src/include/storage/postgres_insert.hpp +++ b/src/include/storage/postgres_insert.hpp @@ -30,6 +30,8 @@ class PostgresInsert : public PhysicalOperator { unique_ptr info; //! column_index_map physical_index_vector_t column_index_map; + //! Whether or not we can keep the copy alive during Sink calls + bool keep_copy_alive = true; public: // Source interface diff --git a/src/include/storage/postgres_update.hpp b/src/include/storage/postgres_update.hpp index 6000d1692..12b5e5e7a 100644 --- a/src/include/storage/postgres_update.hpp +++ b/src/include/storage/postgres_update.hpp @@ -16,12 +16,16 @@ namespace duckdb { class PostgresUpdate : public PhysicalOperator { public: PostgresUpdate(PhysicalPlan &physical_plan, LogicalOperator &op, TableCatalogEntry &table, - vector columns); + vector columns, vector> expressions); //! The table to delete from TableCatalogEntry &table; //! The set of columns to update vector columns; + //! Expressions to execute + vector> expressions; + //! Whether or not we can keep the copy alive during Sink calls + bool keep_copy_alive = true; public: // Source interface diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index e6def3516..f4a66f302 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -9,6 +9,7 @@ add_library( postgres_index_entry.cpp postgres_index_set.cpp postgres_insert.cpp + postgres_merge_into.cpp postgres_optimizer.cpp postgres_schema_entry.cpp postgres_schema_set.cpp diff --git a/src/storage/postgres_insert.cpp b/src/storage/postgres_insert.cpp index fcd710930..e3961e4f9 100644 --- a/src/storage/postgres_insert.cpp +++ b/src/storage/postgres_insert.cpp @@ -31,14 +31,25 @@ PostgresInsert::PostgresInsert(PhysicalPlan &physical_plan, LogicalOperator &op, //===--------------------------------------------------------------------===// class PostgresInsertGlobalState : public GlobalSinkState { public: - explicit PostgresInsertGlobalState(ClientContext &context, PostgresTableEntry *table) - : table(table), insert_count(0) { + explicit PostgresInsertGlobalState(ClientContext &context, PostgresTableEntry &table, PostgresCopyFormat format) + : table(table), insert_count(0), format(format) { } - PostgresTableEntry *table; + PostgresTableEntry &table; PostgresCopyState copy_state; DataChunk varchar_chunk; idx_t insert_count; + PostgresCopyFormat format; + vector insert_column_names; + bool copy_is_active = false; + + void FinishCopyTo(PostgresConnection &connection) { + if (!copy_is_active) { + return; + } + connection.FinishCopyTo(copy_state); + copy_is_active = false; + } }; vector GetInsertColumns(const PostgresInsert &insert, PostgresTableEntry &entry) { @@ -68,7 +79,7 @@ vector GetInsertColumns(const PostgresInsert &insert, PostgresTableEntry } unique_ptr PostgresInsert::GetGlobalSinkState(ClientContext &context) const { - PostgresTableEntry *insert_table; + optional_ptr insert_table; if (!table) { auto &schema_ref = *schema.get_mutable(); insert_table = @@ -79,9 +90,9 @@ unique_ptr PostgresInsert::GetGlobalSinkState(ClientContext &co auto &transaction = PostgresTransaction::Get(context, insert_table->catalog); auto &connection = transaction.GetConnection(); auto insert_columns = GetInsertColumns(*this, *insert_table); - auto result = make_uniq(context, insert_table); auto format = insert_table->GetCopyFormat(context); - vector insert_column_names; + auto result = make_uniq(context, *insert_table, format); + auto &insert_column_names = result->insert_column_names; if (!insert_columns.empty()) { for (auto &str : insert_columns) { auto index = insert_table->GetColumnIndex(str, true); @@ -92,8 +103,6 @@ unique_ptr PostgresInsert::GetGlobalSinkState(ClientContext &co } } } - connection.BeginCopyTo(context, result->copy_state, format, insert_table->schema.name, insert_table->name, - insert_column_names); return std::move(result); } @@ -101,11 +110,21 @@ unique_ptr PostgresInsert::GetGlobalSinkState(ClientContext &co // Sink //===--------------------------------------------------------------------===// SinkResultType PostgresInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { - auto &gstate = sink_state->Cast(); - auto &transaction = PostgresTransaction::Get(context.client, gstate.table->catalog); + auto &gstate = input.global_state.Cast(); + auto &transaction = PostgresTransaction::Get(context.client, gstate.table.catalog); auto &connection = transaction.GetConnection(); + if (!gstate.copy_is_active) { + // copy hasn't started yet + connection.BeginCopyTo(context.client, gstate.copy_state, gstate.format, gstate.table.schema.name, + gstate.table.name, gstate.insert_column_names); + gstate.copy_is_active = true; + } connection.CopyChunk(context.client, gstate.copy_state, chunk, gstate.varchar_chunk); gstate.insert_count += chunk.size(); + if (!keep_copy_alive) { + // if we are can't keep the copy alive we need to restart the copy during every sink + gstate.FinishCopyTo(connection); + } return SinkResultType::NEED_MORE_INPUT; } @@ -114,15 +133,15 @@ SinkResultType PostgresInsert::Sink(ExecutionContext &context, DataChunk &chunk, //===--------------------------------------------------------------------===// SinkFinalizeType PostgresInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, OperatorSinkFinalizeInput &input) const { - auto &gstate = sink_state->Cast(); - auto &transaction = PostgresTransaction::Get(context, gstate.table->catalog); + auto &gstate = input.global_state.Cast(); + auto &transaction = PostgresTransaction::Get(context, gstate.table.catalog); auto &connection = transaction.GetConnection(); - connection.FinishCopyTo(gstate.copy_state); + gstate.FinishCopyTo(connection); // update the approx_num_pages - approximately 8 bytes per column per row idx_t bytes_per_page = 8192; - idx_t bytes_per_row = gstate.table->GetColumns().LogicalColumnCount() * 8; + idx_t bytes_per_row = gstate.table.GetColumns().LogicalColumnCount() * 8; idx_t rows_per_page = MaxValue(1, bytes_per_page / bytes_per_row); - gstate.table->approx_num_pages += gstate.insert_count / rows_per_page; + gstate.table.approx_num_pages += gstate.insert_count / rows_per_page; return SinkFinalizeType::READY; } @@ -218,7 +237,7 @@ PhysicalOperator &PostgresCatalog::PlanInsert(ClientContext &context, PhysicalPl if (op.return_chunk) { throw BinderException("RETURNING clause not yet supported for insertion into Postgres table"); } - if (op.action_type != OnConflictAction::THROW) { + if (op.on_conflict_info.action_type != OnConflictAction::THROW) { throw BinderException("ON CONFLICT clause not yet supported for insertion into Postgres table"); } diff --git a/src/storage/postgres_merge_into.cpp b/src/storage/postgres_merge_into.cpp new file mode 100644 index 000000000..03188d612 --- /dev/null +++ b/src/storage/postgres_merge_into.cpp @@ -0,0 +1,123 @@ +#include "storage/postgres_catalog.hpp" +#include "duckdb/execution/operator/persistent/physical_merge_into.hpp" +#include "duckdb/planner/operator/logical_merge_into.hpp" +#include "storage/postgres_update.hpp" +#include "storage/postgres_delete.hpp" +#include "storage/postgres_insert.hpp" +#include "duckdb/planner/operator/logical_update.hpp" +#include "duckdb/planner/operator/logical_dummy_scan.hpp" +#include "duckdb/planner/operator/logical_delete.hpp" +#include "duckdb/planner/operator/logical_insert.hpp" +#include "duckdb/planner/expression/bound_reference_expression.hpp" +#include "duckdb/parallel/thread_context.hpp" +#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" + +namespace duckdb { + +//===--------------------------------------------------------------------===// +// Plan Merge Into +//===--------------------------------------------------------------------===// +unique_ptr PostgresPlanMergeIntoAction(PostgresCatalog &catalog, ClientContext &context, + LogicalMergeInto &op, PhysicalPlanGenerator &planner, + BoundMergeIntoAction &action, PhysicalOperator &child_plan) { + auto result = make_uniq(); + + result->action_type = action.action_type; + result->condition = std::move(action.condition); + vector> bound_constraints; + for (auto &constraint : op.bound_constraints) { + bound_constraints.push_back(constraint->Copy()); + } + auto return_types = op.types; + + switch (action.action_type) { + case MergeActionType::MERGE_UPDATE: { + if (action.columns.empty()) { + // not updating any columns + result->action_type = MergeActionType::MERGE_DO_NOTHING; + break; + } + LogicalUpdate update(op.table); + for (auto &def : op.bound_defaults) { + update.bound_defaults.push_back(def->Copy()); + } + update.bound_constraints = std::move(bound_constraints); + update.expressions = std::move(action.expressions); + update.columns = std::move(action.columns); + update.update_is_del_and_insert = action.update_is_del_and_insert; + result->op = catalog.PlanUpdate(context, planner, update, child_plan); + auto &pg_update = result->op->Cast(); + pg_update.keep_copy_alive = false; + break; + } + case MergeActionType::MERGE_DELETE: { + LogicalDelete delete_op(op.table, 0); + auto ref = make_uniq(LogicalType::BIGINT, op.row_id_start); + delete_op.expressions.push_back(std::move(ref)); + delete_op.bound_constraints = std::move(bound_constraints); + result->op = catalog.PlanDelete(context, planner, delete_op, child_plan); + break; + } + case MergeActionType::MERGE_INSERT: { + LogicalInsert insert_op(op.table, 0); + insert_op.bound_constraints = std::move(bound_constraints); + for (auto &def : op.bound_defaults) { + insert_op.bound_defaults.push_back(def->Copy()); + } + // transform expressions if required + if (!action.column_index_map.empty()) { + vector> new_expressions; + for (auto &col : op.table.GetColumns().Physical()) { + auto storage_idx = col.StorageOid(); + auto mapped_index = action.column_index_map[col.Physical()]; + if (mapped_index == DConstants::INVALID_INDEX) { + // push default value + new_expressions.push_back(op.bound_defaults[storage_idx]->Copy()); + } else { + // push reference + new_expressions.push_back(std::move(action.expressions[mapped_index])); + } + } + action.expressions = std::move(new_expressions); + } + result->expressions = std::move(action.expressions); + result->op = catalog.PlanInsert(context, planner, insert_op, child_plan); + // MERGE cannot keep the copy alive because we can interleave with other operations + auto &pg_insert = result->op->Cast(); + pg_insert.keep_copy_alive = false; + break; + } + case MergeActionType::MERGE_ERROR: + result->expressions = std::move(action.expressions); + break; + case MergeActionType::MERGE_DO_NOTHING: + break; + default: + throw InternalException("Unsupported merge action"); + } + return result; +} + +PhysicalOperator &PostgresCatalog::PlanMergeInto(ClientContext &context, PhysicalPlanGenerator &planner, + LogicalMergeInto &op, PhysicalOperator &plan) { + if (op.return_chunk) { + throw NotImplementedException("RETURNING is not implemented for Postgres yet"); + } + map>> actions; + + // plan the merge into clauses + for (auto &entry : op.actions) { + vector> planned_actions; + for (auto &action : entry.second) { + planned_actions.push_back(PostgresPlanMergeIntoAction(*this, context, op, planner, *action, plan)); + } + actions.emplace(entry.first, std::move(planned_actions)); + } + + auto &result = planner.Make(op.types, std::move(actions), op.row_id_start, op.source_marker, + false, op.return_chunk); + result.children.push_back(plan); + return result; +} + +} // namespace duckdb diff --git a/src/storage/postgres_table_entry.cpp b/src/storage/postgres_table_entry.cpp index 6d5f9e73c..cea738752 100644 --- a/src/storage/postgres_table_entry.cpp +++ b/src/storage/postgres_table_entry.cpp @@ -3,6 +3,7 @@ #include "storage/postgres_transaction.hpp" #include "duckdb/storage/statistics/base_statistics.hpp" #include "duckdb/storage/table_storage_info.hpp" +#include "duckdb/parser/constraints/unique_constraint.hpp" #include "postgres_scanner.hpp" namespace duckdb { @@ -68,7 +69,25 @@ TableStorageInfo PostgresTableEntry::GetStorageInfo(ClientContext &context) { auto &db = transaction.GetConnection(); TableStorageInfo result; result.cardinality = 0; - result.index_info = db.GetIndexInfo(name); + // get index info based on constraints + for (auto &constraint : constraints) { + if (constraint->type != ConstraintType::UNIQUE) { + continue; + } + IndexInfo info; + auto &unique = constraint->Cast(); + info.is_unique = true; + info.is_primary = unique.IsPrimaryKey(); + info.is_foreign = false; + if (unique.HasIndex()) { + info.column_set.insert(unique.GetIndex().index); + } else { + for (auto &name : unique.GetColumnNames()) { + info.column_set.insert(columns.GetColumn(name).Logical().index); + } + } + result.index_info.push_back(std::move(info)); + } return result; } diff --git a/src/storage/postgres_update.cpp b/src/storage/postgres_update.cpp index 9defc9e6b..8c3b8a305 100644 --- a/src/storage/postgres_update.cpp +++ b/src/storage/postgres_update.cpp @@ -5,13 +5,14 @@ #include "storage/postgres_transaction.hpp" #include "postgres_connection.hpp" #include "duckdb/common/types/uuid.hpp" +#include "duckdb/planner/expression/bound_reference_expression.hpp" namespace duckdb { PostgresUpdate::PostgresUpdate(PhysicalPlan &physical_plan, LogicalOperator &op, TableCatalogEntry &table, - vector columns_p) + vector columns_p, vector> expressions_p) : PhysicalOperator(physical_plan, PhysicalOperatorType::EXTENSION, op.types, 1), table(table), - columns(std::move(columns_p)) { + columns(std::move(columns_p)), expressions(std::move(expressions_p)) { } //===--------------------------------------------------------------------===// @@ -27,7 +28,17 @@ class PostgresUpdateGlobalState : public GlobalSinkState { DataChunk insert_chunk; DataChunk varchar_chunk; string update_sql; + string update_table_name; idx_t update_count; + bool copy_is_active = false; + + void FinishCopyTo(PostgresConnection &connection) { + if (!copy_is_active) { + return; + } + connection.FinishCopyTo(copy_state); + copy_is_active = false; + } }; string CreateUpdateTable(const string &name, PostgresTableEntry &table, const vector &index) { @@ -35,16 +46,14 @@ string CreateUpdateTable(const string &name, PostgresTableEntry &table, const ve result = "CREATE LOCAL TEMPORARY TABLE " + PostgresUtils::QuotePostgresIdentifier(name); result += "("; for (idx_t i = 0; i < index.size(); i++) { - if (i > 0) { - result += ", "; - } auto &column_name = table.postgres_names[index[i].index]; auto &col = table.GetColumn(LogicalIndex(index[i].index)); result += KeywordHelper::WriteQuoted(column_name, '"'); result += " "; result += PostgresUtils::TypeToString(col.GetType()); + result += ", "; } - result += ", __page_id_string VARCHAR) ON COMMIT DROP;"; + result += "__page_id_string VARCHAR) ON COMMIT DROP;"; return result; } @@ -79,10 +88,10 @@ unique_ptr PostgresUpdate::GetGlobalSinkState(ClientContext &co auto result = make_uniq(postgres_table); auto &connection = transaction.GetConnection(); // create a temporary table to stream the update data into - auto table_name = "update_data_" + UUID::ToString(UUID::GenerateRandomUUID()); - connection.Execute(CreateUpdateTable(table_name, postgres_table, columns)); + result->update_table_name = "update_data_" + UUID::ToString(UUID::GenerateRandomUUID()); + connection.Execute(CreateUpdateTable(result->update_table_name, postgres_table, columns)); // generate the final UPDATE sql - result->update_sql = GetUpdateSQL(table_name, postgres_table, columns); + result->update_sql = GetUpdateSQL(result->update_table_name, postgres_table, columns); // initialize the insertion chunk vector insert_types; for (idx_t i = 0; i < columns.size(); i++) { @@ -91,12 +100,6 @@ unique_ptr PostgresUpdate::GetGlobalSinkState(ClientContext &co } insert_types.push_back(LogicalType::VARCHAR); result->insert_chunk.Initialize(context, insert_types); - - // begin the COPY TO - string schema_name; - vector column_names; - connection.BeginCopyTo(context, result->copy_state, PostgresCopyFormat::TEXT, schema_name, table_name, - column_names); return std::move(result); } @@ -108,8 +111,15 @@ SinkResultType PostgresUpdate::Sink(ExecutionContext &context, DataChunk &chunk, chunk.Flatten(); // reference the data columns directly - for (idx_t c = 0; c < columns.size(); c++) { - gstate.insert_chunk.data[c].Reference(chunk.data[c]); + for (idx_t i = 0; i < expressions.size(); i++) { + // Default expression, set to the default value of the column. + if (expressions[i]->GetExpressionType() == ExpressionType::VALUE_DEFAULT) { + throw BinderException("SET DEFAULT is not yet supported for updates of a Postgres table"); + } + + D_ASSERT(expressions[i]->GetExpressionType() == ExpressionType::BOUND_REF); + auto &binding = expressions[i]->Cast(); + gstate.insert_chunk.data[i].Reference(chunk.data[binding.index]); } // convert our row ids back into ctids auto &row_identifiers = chunk.data[chunk.ColumnCount() - 1]; @@ -134,7 +144,18 @@ SinkResultType PostgresUpdate::Sink(ExecutionContext &context, DataChunk &chunk, auto &transaction = PostgresTransaction::Get(context.client, gstate.table.catalog); auto &connection = transaction.GetConnection(); + if (!gstate.copy_is_active) { + // begin the COPY TO + string schema_name; + vector column_names; + connection.BeginCopyTo(context.client, gstate.copy_state, PostgresCopyFormat::TEXT, schema_name, + gstate.update_table_name, column_names); + gstate.copy_is_active = true; + } connection.CopyChunk(context.client, gstate.copy_state, gstate.insert_chunk, gstate.varchar_chunk); + if (!keep_copy_alive) { + gstate.FinishCopyTo(connection); + } gstate.update_count += chunk.size(); return SinkResultType::NEED_MORE_INPUT; } @@ -147,8 +168,7 @@ SinkFinalizeType PostgresUpdate::Finalize(Pipeline &pipeline, Event &event, Clie auto &gstate = input.global_state.Cast(); auto &transaction = PostgresTransaction::Get(context, gstate.table.catalog); auto &connection = transaction.GetConnection(); - // flush the copy to state - connection.FinishCopyTo(gstate.copy_state); + gstate.FinishCopyTo(connection); // merge the update_info table into the actual table (i.e. perform the actual update) connection.Execute(gstate.update_sql); return SinkFinalizeType::READY; @@ -187,14 +207,9 @@ PhysicalOperator &PostgresCatalog::PlanUpdate(ClientContext &context, PhysicalPl if (op.return_chunk) { throw BinderException("RETURNING clause not yet supported for updates of a Postgres table"); } - for (auto &expr : op.expressions) { - if (expr->type == ExpressionType::VALUE_DEFAULT) { - throw BinderException("SET DEFAULT is not yet supported for updates of a Postgres table"); - } - } PostgresCatalog::MaterializePostgresScans(plan); - auto &update = planner.Make(op, op.table, std::move(op.columns)); + auto &update = planner.Make(op, op.table, std::move(op.columns), std::move(op.expressions)); update.children.push_back(plan); return update; } diff --git a/test/configs/attach_postgres.json b/test/configs/attach_postgres.json new file mode 100644 index 000000000..d70eed9d2 --- /dev/null +++ b/test/configs/attach_postgres.json @@ -0,0 +1,55 @@ +{ + "description": "Run on Postgres database as storage.", + "on_init": "ATTACH 'postgres:dbname=postgresscanner' AS pgdb; CALL postgres_execute('pgdb', 'DROP DATABASE IF EXISTS \"{BASE_TEST_NAME}\"', use_transaction = False); CALL postgres_execute('pgdb', 'CREATE DATABASE \"{BASE_TEST_NAME}\"', use_transaction = False); DETACH pgdb; ATTACH 'postgres:dbname={BASE_TEST_NAME}' AS pgdb;", + "on_new_connection": "USE pgdb;", + "on_cleanup": "ATTACH 'postgres:dbname=postgresscanner' AS exit_db; CALL postgres_execute('exit_db', 'DROP DATABASE IF EXISTS \"{BASE_TEST_NAME}\"', use_transaction = False); DETACH exit_db;", + "skip_compiled": "true", + "statically_loaded_extensions": [ + "core_functions", + "postgres_scanner" + ], + "skip_error_messages": [ + "RETURNING is not implemented for Postgres yet", + "SET DEFAULT is not yet supported for updates of a Postgres table" + ], + "skip_tests": [ + { + "reason": "Contains explicit use of the memory catalog.", + "paths": [ + "test/sql/show_select/test_describe_all.test", + "test/sql/catalog/function/attached_macro.test", + "test/sql/catalog/test_temporary.test", + "test/sql/pragma/test_show_tables_temp_views.test", + "test/sql/pg_catalog/system_functions.test", + "test/sql/pg_catalog/sqlalchemy.test", + "test/sql/attach/attach_table_info.test", + "test/sql/attach/attach_defaults.test", + "test/sql/attach/attach_did_you_mean.test", + "test/sql/attach/attach_default_table.test", + "test/sql/attach/attach_show_all_tables.test", + "test/sql/attach/attach_issue7711.test", + "test/sql/attach/attach_issue_7660.test", + "test/sql/attach/show_databases.test", + "test/sql/attach/attach_views.test", + "test/sql/copy_database/copy_table_with_sequence.test", + "test/fuzzer/sqlsmith/current_schemas_null.test", + "test/sql/settings/drop_set_schema.test", + "test/sql/catalog/test_set_search_path.test", + "test/sql/attach/attach_table_constraints.test", + "test/sql/table_function/database_oid.test", + "test/sql/table_function/duckdb_schemas.test", + "test/sql/table_function/duckdb_tables.test", + "test/sql/table_function/test_information_schema_columns.test", + "test/sql/settings/setting_preserve_identifier_case.test", + "test/sql/pragma/test_pragma_database_list.test", + "test/sql/index/art/storage/test_art_duckdb_versions.test", + "test/sql/copy_database/copy_database_with_index.test", + "test/sql/table_function/information_schema.test", + "test/sql/table_function/duckdb_databases.test", + "test/sql/table_function/sqlite_master.test", + "test/sql/table_function/duckdb_columns.test", + "test/sql/table_function/sqlite_master_quotes.test" + ] + } + ] +} diff --git a/test/sql/storage/attach_merge.test b/test/sql/storage/attach_merge.test new file mode 100644 index 000000000..092b65354 --- /dev/null +++ b/test/sql/storage/attach_merge.test @@ -0,0 +1,101 @@ +# name: test/sql/storage/attach_merge.test +# description: Test MERGE INTO +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +PRAGMA enable_verification + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +USE s + +statement ok +CREATE OR REPLACE TABLE Stock(item_id int, balance int); + +statement ok +CREATE OR REPLACE TABLE Buy(item_id int, volume int); + +statement ok +INSERT INTO Buy values(10, 1000); + +statement ok +INSERT INTO Buy values(30, 300); + +# insert using merge into with a CTE +query I +WITH initial_stocks(item_id, balance) AS (VALUES (10, 2200), (20, 1900)) +MERGE INTO Stock USING initial_stocks ON FALSE +WHEN MATCHED THEN DO NOTHING +WHEN NOT MATCHED THEN INSERT VALUES (initial_stocks.item_id, initial_stocks.balance) +---- +2 + +# DO NOTHING is the default - this is a nop +query I +WITH initial_stocks(item_id, balance) AS (VALUES (10, 2200), (20, 1900)) +MERGE INTO Stock USING initial_stocks USING (item_id) +WHEN NOT MATCHED THEN INSERT VALUES (item_id, initial_stocks.balance) +---- +0 + +query II +FROM Stock ORDER BY item_id +---- +10 2200 +20 1900 + +# update and insert +query I +MERGE INTO Stock AS s USING Buy AS b ON s.item_id = b.item_id +WHEN MATCHED THEN UPDATE SET balance = balance + b.volume +WHEN NOT MATCHED THEN INSERT VALUES (b.item_id, b.volume) +---- +2 + +query II +FROM Stock ORDER BY item_id +---- +10 3200 +20 1900 +30 300 + +# sell - deleting all rows that are fully sold +statement ok +CREATE OR REPLACE TABLE Sale(item_id int, volume int); + +statement ok +INSERT INTO Sale VALUES (10, 2200); + +statement ok +INSERT INTO Sale VALUES (20, 1900); + +query I +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id +WHEN MATCHED AND Sale.volume > balance THEN ERROR +WHEN MATCHED AND Sale.volume = balance THEN DELETE +WHEN MATCHED AND TRUE THEN UPDATE SET balance = balance - Sale.volume +WHEN MATCHED THEN ERROR +WHEN NOT MATCHED THEN ERROR +---- +2 + +query II +FROM Stock ORDER BY item_id +---- +10 1000 +30 300 + +# abort - row does not exist +statement error +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id +WHEN MATCHED AND Sale.volume >= balance THEN DELETE +WHEN MATCHED THEN UPDATE SET balance = balance - Sale.volume +WHEN NOT MATCHED THEN ERROR CONCAT('Sale item with item id ', Sale.item_id, ' not found'); +---- +Sale item with item id 20 not found diff --git a/test/sql/storage/attach_on_conflict.test b/test/sql/storage/attach_on_conflict.test new file mode 100644 index 000000000..101463a59 --- /dev/null +++ b/test/sql/storage/attach_on_conflict.test @@ -0,0 +1,200 @@ +# name: test/sql/storage/attach_on_conflict.test +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +PRAGMA enable_verification + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +USE s + +statement ok +PRAGMA enable_verification; + +statement ok +CREATE OR REPLACE TABLE tbl( + i INT PRIMARY KEY, + j INT UNIQUE, + k INT +); + +statement ok +INSERT INTO tbl VALUES (1, 10, 1), (2, 20, 1), (3, 30, 2); + +# Update the on-conflict column. +statement ok +INSERT INTO tbl VALUES (3, 5, 1) +ON CONFLICT (i) DO UPDATE SET i = i + 1; + +query III +SELECT i, j, k FROM tbl ORDER BY ALL; +---- +1 10 1 +2 20 1 +4 30 2 + +query III +SELECT i, j, k FROM tbl WHERE i = 4; +---- +4 30 2 + +# Update the on-conflict column again. +statement ok +INSERT INTO tbl VALUES (4, 30, 2) +ON CONFLICT (i) DO UPDATE SET i = i - 1; + +query III +SELECT i, j, k FROM tbl ORDER BY ALL; +---- +1 10 1 +2 20 1 +3 30 2 + +# Cannot update to the same PK value as another column. +statement error +INSERT INTO tbl VALUES (3, 30, 2) +ON CONFLICT (i) DO UPDATE SET i = i - 2; +---- +duplicate key value + +# 'excluded' refers to the VALUES list, turning this into: +# (k)2 + (k.excluded)1 = 3 +statement ok +insert into tbl VALUES + (3,5,1) +ON CONFLICT (i) DO UPDATE SET k = k + excluded.k; + +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 3 + +# The ON CONFLICT does not refer to a column that's indexed on, so it's never true +statement error +insert into tbl VALUES + (3,5,1) +ON CONFLICT (k) DO UPDATE SET k = excluded.k; +---- +Binder Error: The specified columns as conflict target are not referenced by a UNIQUE/PRIMARY KEY CONSTRAINT + +# Overwrite the existing value with the new value +statement ok +insert into tbl VALUES + (3,5,1) +ON CONFLICT (i) DO UPDATE SET k = excluded.k; + +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 1 + +# Don't alter the existing row, but still insert the non-conflicting row +statement ok +insert into tbl VALUES + (4,2,3), + (3,5,10) +ON CONFLICT (i) DO NOTHING; + +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 1 +4 2 3 + +# Two rows cause a conflict, on the same existing row +# only the last one is used +statement ok +insert into tbl VALUES + (3,3,10), + (3,3,10) +ON CONFLICT (i) DO UPDATE SET + k = excluded.k; + +query III rowsort +select * from tbl order by all +---- +1 10 1 +2 20 1 +3 30 10 +4 2 3 + +# condition is not matched - no updates have happened +query I +insert into tbl VALUES (3,5,1) ON CONFLICT (i) DO UPDATE SET k = 1 WHERE k < 5; +---- +0 + +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 10 +4 2 3 + +# When the condition is met, the DO is performed +query I +insert into tbl VALUES (3,5,1) ON CONFLICT (i) DO UPDATE SET k = 1 WHERE k >= 5; +---- +1 + +# 'k' in row_id:3 is updated to 1 +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 1 +4 2 3 + +# When the condition is on the DO UPDATE part, +# it will always succeed, but turn into a DO NOTHING for the conflicts that don't meet the condition + +statement ok +insert into tbl VALUES (3,5,3) on conflict (i) do update set k = 10 WHERE k != 1; + +# Unchanged, because the where clause is not met +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 1 +4 2 3 + +statement ok +insert into tbl VALUES (3,5,3) on conflict (i) do update set k = 10 WHERE k == 1; + +# Changed, because the where clause is met +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 10 +4 2 3 + +# When we don't specify a conflict target, all unique/primary key constraints are used as the conflict target +statement ok +insert into tbl VALUES (5,1,0), (3,5,20) ON CONFLICT DO NOTHING; + +query III rowsort +select * from tbl; +---- +1 10 1 +2 20 1 +3 30 10 +4 2 3 +5 1 0