Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/Linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
aarch64_cross_compile: 1

- name: Setup vcpkg
uses: lukka/run-vcpkg@v11
uses: lukka/run-vcpkg@v11.1
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1

Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ add_library(
postgres_copy_to.cpp
postgres_extension.cpp
postgres_filter_pushdown.cpp
postgres_query.cpp
postgres_scanner.cpp
postgres_storage.cpp
postgres_utils.cpp)
Expand Down
7 changes: 6 additions & 1 deletion src/include/postgres_scanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct PostgresBindData : public FunctionData {

string schema_name;
string table_name;
string sql;
idx_t pages_approx = 0;

vector<PostgresType> postgres_types;
Expand All @@ -37,7 +38,6 @@ struct PostgresBindData : public FunctionData {
idx_t max_threads = 1;

PostgresConnection connection;
optional_ptr<PostgresTransaction> transaction;
PostgresConnectionReservation connection_reservation;

public:
Expand Down Expand Up @@ -73,4 +73,9 @@ class PostgresClearCacheFunction : public TableFunction {
PostgresClearCacheFunction();
};

class PostgresQueryFunction : public TableFunction {
public:
PostgresQueryFunction();
};

} // namespace duckdb
192 changes: 192 additions & 0 deletions src/include/postgres_type_oids.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// taken from pg_type_d.h
#pragma once

#define BOOLOID 16
#define BYTEAOID 17
#define CHAROID 18
#define NAMEOID 19
#define INT8OID 20
#define INT2OID 21
#define INT2VECTOROID 22
#define INT4OID 23
#define REGPROCOID 24
#define TEXTOID 25
#define OIDOID 26
#define TIDOID 27
#define XIDOID 28
#define CIDOID 29
#define OIDVECTOROID 30
#define JSONOID 114
#define XMLOID 142
#define PG_NODE_TREEOID 194
#define PG_NDISTINCTOID 3361
#define PG_DEPENDENCIESOID 3402
#define PG_MCV_LISTOID 5017
#define PG_DDL_COMMANDOID 32
#define XID8OID 5069
#define POINTOID 600
#define LSEGOID 601
#define PATHOID 602
#define BOXOID 603
#define POLYGONOID 604
#define LINEOID 628
#define FLOAT4OID 700
#define FLOAT8OID 701
#define UNKNOWNOID 705
#define CIRCLEOID 718
#define MONEYOID 790
#define MACADDROID 829
#define INETOID 869
#define CIDROID 650
#define MACADDR8OID 774
#define ACLITEMOID 1033
#define BPCHAROID 1042
#define VARCHAROID 1043
#define DATEOID 1082
#define TIMEOID 1083
#define TIMESTAMPOID 1114
#define TIMESTAMPTZOID 1184
#define INTERVALOID 1186
#define TIMETZOID 1266
#define BITOID 1560
#define VARBITOID 1562
#define NUMERICOID 1700
#define REFCURSOROID 1790
#define REGPROCEDUREOID 2202
#define REGOPEROID 2203
#define REGOPERATOROID 2204
#define REGCLASSOID 2205
#define REGCOLLATIONOID 4191
#define REGTYPEOID 2206
#define REGROLEOID 4096
#define REGNAMESPACEOID 4089
#define UUIDOID 2950
#define PG_LSNOID 3220
#define TSVECTOROID 3614
#define GTSVECTOROID 3642
#define TSQUERYOID 3615
#define REGCONFIGOID 3734
#define REGDICTIONARYOID 3769
#define JSONBOID 3802
#define JSONPATHOID 4072
#define TXID_SNAPSHOTOID 2970
#define PG_SNAPSHOTOID 5038
#define INT4RANGEOID 3904
#define NUMRANGEOID 3906
#define TSRANGEOID 3908
#define TSTZRANGEOID 3910
#define DATERANGEOID 3912
#define INT8RANGEOID 3926
#define INT4MULTIRANGEOID 4451
#define NUMMULTIRANGEOID 4532
#define TSMULTIRANGEOID 4533
#define TSTZMULTIRANGEOID 4534
#define DATEMULTIRANGEOID 4535
#define INT8MULTIRANGEOID 4536
#define RECORDOID 2249
#define RECORDARRAYOID 2287
#define CSTRINGOID 2275
#define ANYOID 2276
#define ANYARRAYOID 2277
#define VOIDOID 2278
#define TRIGGEROID 2279
#define EVENT_TRIGGEROID 3838
#define LANGUAGE_HANDLEROID 2280
#define INTERNALOID 2281
#define ANYELEMENTOID 2283
#define ANYNONARRAYOID 2776
#define ANYENUMOID 3500
#define FDW_HANDLEROID 3115
#define INDEX_AM_HANDLEROID 325
#define TSM_HANDLEROID 3310
#define TABLE_AM_HANDLEROID 269
#define ANYRANGEOID 3831
#define ANYCOMPATIBLEOID 5077
#define ANYCOMPATIBLEARRAYOID 5078
#define ANYCOMPATIBLENONARRAYOID 5079
#define ANYCOMPATIBLERANGEOID 5080
#define ANYMULTIRANGEOID 4537
#define ANYCOMPATIBLEMULTIRANGEOID 4538
#define PG_BRIN_BLOOM_SUMMARYOID 4600
#define PG_BRIN_MINMAX_MULTI_SUMMARYOID 4601
#define BOOLARRAYOID 1000
#define BYTEAARRAYOID 1001
#define CHARARRAYOID 1002
#define NAMEARRAYOID 1003
#define INT8ARRAYOID 1016
#define INT2ARRAYOID 1005
#define INT2VECTORARRAYOID 1006
#define INT4ARRAYOID 1007
#define REGPROCARRAYOID 1008
#define TEXTARRAYOID 1009
#define OIDARRAYOID 1028
#define TIDARRAYOID 1010
#define XIDARRAYOID 1011
#define CIDARRAYOID 1012
#define OIDVECTORARRAYOID 1013
#define PG_TYPEARRAYOID 210
#define PG_ATTRIBUTEARRAYOID 270
#define PG_PROCARRAYOID 272
#define PG_CLASSARRAYOID 273
#define JSONARRAYOID 199
#define XMLARRAYOID 143
#define XID8ARRAYOID 271
#define POINTARRAYOID 1017
#define LSEGARRAYOID 1018
#define PATHARRAYOID 1019
#define BOXARRAYOID 1020
#define POLYGONARRAYOID 1027
#define LINEARRAYOID 629
#define FLOAT4ARRAYOID 1021
#define FLOAT8ARRAYOID 1022
#define CIRCLEARRAYOID 719
#define MONEYARRAYOID 791
#define MACADDRARRAYOID 1040
#define INETARRAYOID 1041
#define CIDRARRAYOID 651
#define MACADDR8ARRAYOID 775
#define ACLITEMARRAYOID 1034
#define BPCHARARRAYOID 1014
#define VARCHARARRAYOID 1015
#define DATEARRAYOID 1182
#define TIMEARRAYOID 1183
#define TIMESTAMPARRAYOID 1115
#define TIMESTAMPTZARRAYOID 1185
#define INTERVALARRAYOID 1187
#define TIMETZARRAYOID 1270
#define BITARRAYOID 1561
#define VARBITARRAYOID 1563
#define NUMERICARRAYOID 1231
#define REFCURSORARRAYOID 2201
#define REGPROCEDUREARRAYOID 2207
#define REGOPERARRAYOID 2208
#define REGOPERATORARRAYOID 2209
#define REGCLASSARRAYOID 2210
#define REGCOLLATIONARRAYOID 4192
#define REGTYPEARRAYOID 2211
#define REGROLEARRAYOID 4097
#define REGNAMESPACEARRAYOID 4090
#define UUIDARRAYOID 2951
#define PG_LSNARRAYOID 3221
#define TSVECTORARRAYOID 3643
#define GTSVECTORARRAYOID 3644
#define TSQUERYARRAYOID 3645
#define REGCONFIGARRAYOID 3735
#define REGDICTIONARYARRAYOID 3770
#define JSONBARRAYOID 3807
#define JSONPATHARRAYOID 4073
#define TXID_SNAPSHOTARRAYOID 2949
#define PG_SNAPSHOTARRAYOID 5039
#define INT4RANGEARRAYOID 3905
#define NUMRANGEARRAYOID 3907
#define TSRANGEARRAYOID 3909
#define TSTZRANGEARRAYOID 3911
#define DATERANGEARRAYOID 3913
#define INT8RANGEARRAYOID 3927
#define INT4MULTIRANGEARRAYOID 6150
#define NUMMULTIRANGEARRAYOID 6151
#define TSMULTIRANGEARRAYOID 6152
#define TSTZMULTIRANGEARRAYOID 6153
#define DATEMULTIRANGEARRAYOID 6155
#define INT8MULTIRANGEARRAYOID 6157
#define CSTRINGARRAYOID 1263
1 change: 1 addition & 0 deletions src/include/postgres_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class PostgresUtils {
optional_ptr<PostgresSchemaEntry> schema, const PostgresTypeData &input,
PostgresType &postgres_type);
static string TypeToString(const LogicalType &input);
static string PostgresOidToName(uint32_t oid);
static uint32_t ToPostgresOid(const LogicalType &input);
static bool SupportedPostgresOid(const LogicalType &input);
static LogicalType RemoveAlias(const LogicalType &type);
Expand Down
3 changes: 3 additions & 0 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ static void LoadInternal(DatabaseInstance &db) {
PostgresClearCacheFunction clear_cache_func;
ExtensionUtil::RegisterFunction(db, clear_cache_func);

PostgresQueryFunction query_func;
ExtensionUtil::RegisterFunction(db, query_func);

auto &config = DBConfig::GetConfig(db);
config.storage_extensions["postgres_scanner"] = make_uniq<PostgresStorageExtension>();

Expand Down
86 changes: 86 additions & 0 deletions src/postgres_query.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include "duckdb.hpp"

#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
#include "postgres_scanner.hpp"
#include "duckdb/main/database_manager.hpp"
#include "duckdb/main/attached_database.hpp"
#include "storage/postgres_catalog.hpp"
#include "storage/postgres_transaction.hpp"

namespace duckdb {

static unique_ptr<FunctionData> PGQueryBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto result = make_uniq<PostgresBindData>();

// look up the database to query
auto db_name = input.inputs[0].GetValue<string>();
auto &db_manager = DatabaseManager::Get(context);
auto db = db_manager.GetDatabase(context, db_name);
if (!db) {
throw BinderException("Failed to find attached database \"%s\" referenced in postgres_query", db_name);
}
auto &catalog = db->GetCatalog();
if (catalog.GetCatalogType() != "postgres") {
throw BinderException("Attached database \"%s\" does not refer to a Postgres database", db_name);
}
auto &pg_catalog = catalog.Cast<PostgresCatalog>();
auto &transaction = Transaction::Get(context, catalog).Cast<PostgresTransaction>();
auto sql = input.inputs[1].GetValue<string>();

auto &con = transaction.GetConnection();
auto conn = con.GetConn();
// prepare execution of the query to figure out the result types and names
auto prepared = PQprepare(conn, "", sql.c_str(), 0, nullptr);
PostgresResult prepared_wrapper(prepared);
if (!prepared) {
throw BinderException("Failed to prepare query \"%s\" (no result returned): %s", sql, PQerrorMessage(conn));
}
if (PQresultStatus(prepared) != PGRES_COMMAND_OK) {
throw BinderException("Failed to prepare query \"%s\": %s", sql, PQresultErrorMessage(prepared));
}
// use describe_prepared
auto describe_prepared = PQdescribePrepared(conn, "");
PostgresResult describe_wrapper(describe_prepared);
if (!describe_prepared || PQresultStatus(describe_prepared) != PGRES_COMMAND_OK) {
auto extended_err = describe_prepared ? PQresultErrorMessage(describe_prepared) : PQerrorMessage(conn);
throw BinderException("Failed to describe prepared statement: %s", extended_err);
}
auto nfields = PQnfields(describe_prepared);
if (nfields <= 0) {
throw BinderException("No fields returned by query \"%s\" - the query must be a SELECT statement that returns at least one column", sql);
}
for(idx_t c = 0; c < nfields; c++) {
PostgresType postgres_type;
postgres_type.oid = PQftype(describe_prepared, c);
PostgresTypeData type_data;
type_data.type_name = PostgresUtils::PostgresOidToName(postgres_type.oid);
type_data.type_modifier = PQfmod(describe_prepared, c);
auto converted_type = PostgresUtils::TypeToLogicalType(nullptr, nullptr, type_data, postgres_type);
result->postgres_types.push_back(postgres_type);
return_types.emplace_back(converted_type);
names.emplace_back(PQfname(describe_prepared, c));
}

// set up the bind data
result->dsn = con.GetDSN();
result->connection = PostgresConnection(con.GetConnection());
result->types = return_types;
result->names = names;
result->read_only = false;
result->in_recovery = true;
result->SetTablePages(0);
result->sql = std::move(sql);
return std::move(result);
}

PostgresQueryFunction::PostgresQueryFunction()
: TableFunction("postgres_query", {LogicalType::VARCHAR, LogicalType::VARCHAR},
nullptr, PGQueryBind) {
PostgresScanFunction scan_function;
init_global = scan_function.init_global;
init_local = scan_function.init_local;
function = scan_function.function;
projection_pushdown = true;
}
}
Loading