Skip to content

Commit

Permalink
[C] Add nanoarrow-based libpq driver
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Aug 26, 2022
1 parent e3c0c10 commit e5c1751
Show file tree
Hide file tree
Showing 14 changed files with 2,190 additions and 29 deletions.
71 changes: 71 additions & 0 deletions c/drivers/postgres/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

cmake_minimum_required(VERSION 3.14)
get_filename_component(REPOSITORY_ROOT "../../../" ABSOLUTE)
list(APPEND CMAKE_MODULE_PATH "${REPOSITORY_ROOT}/c/cmake_modules/")
include(AdbcDefines)
include(BuildUtils)

project(adbc_driver_postgres
VERSION "${ADBC_BASE_VERSION}"
LANGUAGES CXX)
include(CTest)
find_package(PkgConfig)

pkg_check_modules(LIBPQ REQUIRED libpq)

add_arrow_lib(adbc_driver_postgres
SOURCES
connection.cc
database.cc
postgres.cc
statement.cc
type.cc
${REPOSITORY_ROOT}/c/vendor/nanoarrow/nanoarrow.c
OUTPUTS
ADBC_LIBRARIES
SHARED_LINK_LIBS
${LIBPQ_LIBRARIES})
include_directories(SYSTEM ${REPOSITORY_ROOT})
include_directories(SYSTEM ${REPOSITORY_ROOT}/c/)
include_directories(SYSTEM ${REPOSITORY_ROOT}/c/vendor/nanoarrow/)
include_directories(SYSTEM ${LIBPQ_INCLUDE_DIRS})
foreach(LIB_TARGET ${ADBC_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ADBC_EXPORTING)
endforeach()

if(ADBC_TEST_LINKAGE STREQUAL "shared")
set(TEST_LINK_LIBS adbc_driver_postgres_shared)
else()
set(TEST_LINK_LIBS adbc_driver_postgres_static)
endif()

if(ADBC_BUILD_TESTS)
add_test_case(driver_postgres_test
PREFIX
adbc
SOURCES
postgres_test.cc
../../validation/adbc_validation.c
${REPOSITORY_ROOT}/c/vendor/nanoarrow/nanoarrow.c
EXTRA_LINK_LIBS
${TEST_LINK_LIBS})
endif()

validate_config()
config_summary_message()
25 changes: 25 additions & 0 deletions c/drivers/postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# libpq ADBC Driver

With credit to 0x0L's [pgeon](https://github.com/0x0L/pgeon) for the overall approach.

This implements an ADBC driver that wraps [libpq](https://www.postgresql.org/docs/14/libpq.html).
This is still a work in progress.
118 changes: 118 additions & 0 deletions c/drivers/postgres/connection.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "connection.h"

#include <cstring>
#include <memory>

#include <adbc.h>

#include "database.h"
#include "util.h"

namespace adbcpq {
AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) {
if (autocommit_) {
SetError(error, "Cannot commit when autocommit is enabled");
return ADBC_STATUS_INVALID_STATE;
}

PGresult* result = PQexec(conn_, "COMMIT");
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error, "Failed to commit: ", PQerrorMessage(conn_));
PQclear(result);
return ADBC_STATUS_IO;
}
PQclear(result);
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
const char* db_schema,
const char* table_name,
struct ArrowSchema* schema,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database || !database->private_data) {
SetError(error, "Must provide an initialized AdbcDatabase");
return ADBC_STATUS_INVALID_ARGUMENT;
}
database_ =
*reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
type_mapping_ = database_->type_mapping();
return database_->Connect(&conn_, error);
}

AdbcStatusCode PostgresConnection::Release(struct AdbcError* error) {
if (conn_) {
return database_->Disconnect(&conn_, error);
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresConnection::Rollback(struct AdbcError* error) {
if (autocommit_) {
SetError(error, "Cannot rollback when autocommit is enabled");
return ADBC_STATUS_INVALID_STATE;
}

PGresult* result = PQexec(conn_, "ROLLBACK");
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error, "Failed to rollback: ", PQerrorMessage(conn_));
PQclear(result);
return ADBC_STATUS_IO;
}
PQclear(result);
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresConnection::SetOption(const char* key, const char* value,
struct AdbcError* error) {
if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
bool autocommit = true;
if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
autocommit = true;
} else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
autocommit = false;
} else {
SetError(error, "Invalid value for option ", key, ": ", value);
return ADBC_STATUS_INVALID_ARGUMENT;
}

if (autocommit != autocommit_) {
const char* query = autocommit ? "COMMIT" : "BEGIN TRANSACTION";

PGresult* result = PQexec(conn_, query);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error, "Failed to update autocommit: ", PQerrorMessage(conn_));
PQclear(result);
return ADBC_STATUS_IO;
}
PQclear(result);
autocommit_ = autocommit;
}
return ADBC_STATUS_OK;
}
SetError(error, "Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
} // namespace adbcpq
50 changes: 50 additions & 0 deletions c/drivers/postgres/connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdint>
#include <memory>

#include <adbc.h>
#include <libpq-fe.h>

#include "type.h"

namespace adbcpq {
class PostgresDatabase;
class PostgresConnection {
public:
PostgresConnection() : database_(nullptr), conn_(nullptr), autocommit_(true) {}

AdbcStatusCode Commit(struct AdbcError* error);
AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema,
struct AdbcError* error);
AdbcStatusCode Init(struct AdbcDatabase* database, struct AdbcError* error);
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode Rollback(struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);

PGconn* conn() const { return conn_; }
const std::shared_ptr<TypeMapping>& type_mapping() const { return type_mapping_; }

private:
std::shared_ptr<PostgresDatabase> database_;
std::shared_ptr<TypeMapping> type_mapping_;
PGconn* conn_;
bool autocommit_;
};
} // namespace adbcpq
124 changes: 124 additions & 0 deletions c/drivers/postgres/database.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "database.h"

#include <cstring>
#include <memory>

#include <adbc.h>
#include <libpq-fe.h>
#include <nanoarrow.h>

#include "util.h"

namespace adbcpq {

PostgresDatabase::PostgresDatabase() : open_connections_(0) {
type_mapping_ = std::make_shared<TypeMapping>();
}
PostgresDatabase::~PostgresDatabase() = default;

AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
// Connect to validate the parameters.
PGconn* conn = nullptr;
AdbcStatusCode final_status = Connect(&conn, error);
if (final_status != ADBC_STATUS_OK) {
return final_status;
}

// Build the type mapping table.
const std::string kTypeQuery = R"(
SELECT
oid,
typname,
typreceive
FROM
pg_catalog.pg_type
)";

pg_result* result = PQexec(conn, kTypeQuery.c_str());
ExecStatusType pq_status = PQresultStatus(result);
if (pq_status == PGRES_TUPLES_OK) {
int num_rows = PQntuples(result);
for (int row = 0; row < num_rows; row++) {
const uint32_t oid = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr, /*base=*/10));
const char* typname = PQgetvalue(result, row, 1);
const char* typreceive = PQgetvalue(result, row, 2);

type_mapping_->Insert(oid, typname, typreceive);
}
} else {
SetError(error, "Failed to execute build type mapping table: ", PQerrorMessage(conn));
final_status = ADBC_STATUS_IO;
}
PQclear(result);

// Disconnect since Postgres connections can be heavy.
{
AdbcStatusCode status = Disconnect(&conn, error);
if (status != ADBC_STATUS_OK) final_status = status;
}
return final_status;
}

AdbcStatusCode PostgresDatabase::Release(struct AdbcError* error) {
if (open_connections_ != 0) {
SetError(error, "Database released with ", open_connections_, " open connections");
return ADBC_STATUS_INVALID_STATE;
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresDatabase::SetOption(const char* key, const char* value,
struct AdbcError* error) {
if (strcmp(key, "uri") == 0) {
uri_ = value;
} else {
SetError(error, "Unknown database option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresDatabase::Connect(PGconn** conn, struct AdbcError* error) {
if (uri_.empty()) {
SetError(error, "Must set database option 'uri' before creating a connection");
return ADBC_STATUS_INVALID_STATE;
}
*conn = PQconnectdb(uri_.c_str());
if (PQstatus(*conn) != CONNECTION_OK) {
SetError(error, "Failed to connect: ", PQerrorMessage(*conn));
PQfinish(*conn);
*conn = nullptr;
return ADBC_STATUS_IO;
}
open_connections_++;
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresDatabase::Disconnect(PGconn** conn, struct AdbcError* error) {
PQfinish(*conn);
*conn = nullptr;
if (--open_connections_ < 0) {
SetError(error, "Open connection count underflowed");
return ADBC_STATUS_INTERNAL;
}
return ADBC_STATUS_OK;
}
} // namespace adbcpq
Loading

0 comments on commit e5c1751

Please sign in to comment.