Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 74 additions & 45 deletions benchmark/insert-copy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,18 @@
* under the License.
*/

#include <chrono>
#include <cstdlib>
#include <iostream>
#include <vector>
#include "insert.hh"

#include <libpq-fe.h>
#include <arpa/inet.h>

class ConnectionFinisher {
public:
ConnectionFinisher(PGconn* connection) : connection_(connection) {}
~ConnectionFinisher() { PQfinish(connection_); }

private:
PGconn* connection_;
};

class ResultClearner {
public:
ResultClearner(PGresult* result) : result_(result) {}
~ResultClearner() { PQclear(result_); }

private:
PGresult* result_;
};
namespace {
template <typename Type>
void
write_binary_data(std::ostream& stream, Type value)
{
stream.write(reinterpret_cast<const char*>(&value), sizeof(Type));
}
}; // namespace

int
main(int argc, char** argv)
Expand Down Expand Up @@ -78,33 +66,31 @@ main(int argc, char** argv)
}
}

std::vector<std::string> buffers;
std::vector<std::vector<Value>> records;
{
auto result = PQexec(connection, "COPY data TO STDOUT (FORMAT binary)");
auto result = PQexec(connection, "SELECT * FROM data");
ResultClearner resultClearner(result);
if (PQresultStatus(result) != PGRES_COPY_OUT)
if (PQresultStatus(result) != PGRES_TUPLES_OK)
{
std::cerr << "failed to copy to: " << PQerrorMessage(connection) << std::endl;
std::cerr << "failed to select: " << PQerrorMessage(connection) << std::endl;
return EXIT_FAILURE;
}
while (true)
auto nTuples = PQntuples(result);
auto nFields = PQnfields(result);
for (int iTuple = 0; iTuple < nTuples; iTuple++)
{
char* data;
auto size = PQgetCopyData(connection, &data, 0);
if (size == -1)
{
break;
}
if (size == -2)
std::vector<Value> values;
for (int iField = 0; iField < nFields; iField++)
{
std::cerr << "failed to read copy data: " << PQerrorMessage(connection)
<< std::endl;
return EXIT_FAILURE;
if (!append_value(values, result, iTuple, iField))
{
return EXIT_FAILURE;
}
}
buffers.emplace_back(data, size);
free(data);
records.push_back(std::move(values));
}
}

auto before = std::chrono::steady_clock::now();
{
auto result = PQexec(connection, "COPY data_insert FROM STDOUT (FORMAT binary)");
Expand All @@ -115,15 +101,58 @@ main(int argc, char** argv)
<< std::endl;
return EXIT_FAILURE;
}
for (const auto& buffer : buffers)
std::ostringstream copyDataStream;
{
auto copyDataResult = PQputCopyData(connection, buffer.data(), buffer.size());
if (copyDataResult == -1)
// See the "Binary Format" section in
// https://www.postgresql.org/docs/current/sql-copy.html for
// details.

const char signature[] = "PGCOPY\n\377\r\n";
// The last '\0' is also part of the signature.
copyDataStream << std::string_view(signature, sizeof(signature));
const uint32_t flags = 0;
write_binary_data(copyDataStream, htonl(flags));
const uint32_t headerExtensionAreaLength = 0;
write_binary_data(copyDataStream, htonl(headerExtensionAreaLength));
auto nRecords = records.size();
for (size_t iRecord = 0; iRecord < nRecords; ++iRecord)
{
std::cerr << "failed to put copy data: " << PQerrorMessage(connection)
<< std::endl;
return EXIT_FAILURE;
const auto& values = records[iRecord];
const auto nValues = values.size();
write_binary_data(copyDataStream, htons(nValues));
for (size_t iValue = 0; iValue < nValues; ++iValue)
{
const auto& value = values[iValue];
if (std::holds_alternative<std::monostate>(value))
{
write_binary_data(copyDataStream,
htonl(static_cast<uint32_t>(-1)));
}
else if (std::holds_alternative<int32_t>(value))
{
const auto& int32Value = std::get<int32_t>(value);
write_binary_data(copyDataStream, htonl(sizeof(int32_t)));
write_binary_data(copyDataStream,
htonl(static_cast<uint32_t>(int32Value)));
}
else if (std::holds_alternative<std::string>(value))
{
const auto& stringValue = std::get<std::string>(value);
write_binary_data(copyDataStream, htonl(stringValue.size()));
copyDataStream << stringValue;
}
}
}
const uint16_t fileTrailer = -1;
write_binary_data(copyDataStream, htons(fileTrailer));
}
const auto& copyData = copyDataStream.str();
auto copyDataResult = PQputCopyData(connection, copyData.data(), copyData.size());
if (copyDataResult == -1)
{
std::cerr << "failed to put copy data: " << PQerrorMessage(connection)
<< std::endl;
return EXIT_FAILURE;
}
auto copyEndResult = PQputCopyEnd(connection, nullptr);
if (copyEndResult == -1)
Expand Down
92 changes: 44 additions & 48 deletions benchmark/insert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,7 @@
* under the License.
*/

#include <chrono>
#include <cstdlib>
#include <iostream>

#include <libpq-fe.h>

#include <catalog/pg_type_d.h>

class ConnectionFinisher {
public:
ConnectionFinisher(PGconn* connection) : connection_(connection) {}
~ConnectionFinisher() { PQfinish(connection_); }

private:
PGconn* connection_;
};

class ResultClearner {
public:
ResultClearner(PGresult* result) : result_(result) {}
~ResultClearner() { PQclear(result_); }

private:
PGresult* result_;
};
#include "insert.hh"

int
main(int argc, char** argv)
Expand Down Expand Up @@ -79,7 +55,7 @@ main(int argc, char** argv)
}
}

std::string insert = "INSERT INTO data_insert VALUES ";
std::vector<std::vector<Value>> records;
{
auto result = PQexec(connection, "SELECT * FROM data");
ResultClearner resultClearner(result);
Expand All @@ -92,37 +68,57 @@ main(int argc, char** argv)
auto nFields = PQnfields(result);
for (int iTuple = 0; iTuple < nTuples; iTuple++)
{
if (iTuple > 0)
{
insert += ", ";
}
std::vector<Value> values;
for (int iField = 0; iField < nFields; iField++)
{
if (PQgetisnull(result, iTuple, iField))
{
insert += "(null)";
}
else
if (!append_value(values, result, iTuple, iField))
{
insert += "(";
auto type = PQftype(result, iField);
if (type == TEXTOID)
{
insert += "'";
}
insert += PQgetvalue(result, iTuple, iField);
if (type == TEXTOID)
{
insert += "'";
}
insert += ")";
return EXIT_FAILURE;
}
}
records.push_back(std::move(values));
}
}

auto before = std::chrono::steady_clock::now();
{
auto result = PQexec(connection, insert.c_str());
std::ostringstream insert;
insert << "INSERT INTO data_insert VALUES ";
auto nRecords = records.size();
for (size_t iRecord = 0; iRecord < nRecords; ++iRecord)
{
const auto& values = records[iRecord];
if (iRecord > 0)
{
insert << ", ";
}
insert << "(";
auto nValues = values.size();
for (size_t iValue = 0; iValue < nValues; ++iValue)
{
if (iValue > 0)
{
insert << ", ";
}
const auto& value = values[iValue];
if (std::holds_alternative<std::monostate>(value))
{
insert << "null";
}
else if (std::holds_alternative<int32_t>(value))
{
insert << std::get<int32_t>(value);
}
else if (std::holds_alternative<std::string>(value))
{
insert << "'";
insert << std::get<std::string>(value);
insert << "'";
}
}
insert << ")";
}
auto result = PQexec(connection, insert.str().c_str());
ResultClearner resultClearner(result);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
Expand Down
92 changes: 92 additions & 0 deletions benchmark/insert.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <charconv>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <variant>
#include <vector>

#include <libpq-fe.h>

#include <catalog/pg_type_d.h>

namespace {
class ConnectionFinisher {
public:
ConnectionFinisher(PGconn* connection) : connection_(connection) {}
~ConnectionFinisher() { PQfinish(connection_); }

private:
PGconn* connection_;
};

class ResultClearner {
public:
ResultClearner(PGresult* result) : result_(result) {}
~ResultClearner() { PQclear(result_); }

private:
PGresult* result_;
};

using Value = std::variant<std::monostate, int32_t, std::string>;

bool
append_value(std::vector<Value>& values, PGresult* result, int iTuple, int iField)
{
if (PQgetisnull(result, iTuple, iField))
{
values.push_back(std::monostate{});
return true;
}

Oid type = PQftype(result, iField);
char* rawValue = PQgetvalue(result, iTuple, iField);
int length = PQgetlength(result, iTuple, iField);
switch (type)
{
case INT4OID:
{
int32_t value;
auto result = std::from_chars(rawValue, rawValue + length, value);
if (result.ec != std::errc{})
{
std::cerr << "failed to parse integer value: " << rawValue << std::endl;
return false;
}
values.emplace_back(value);
}
break;
case TEXTOID:
{
values.emplace_back(std::string(rawValue, length));
}
break;
default:
std::cerr << "unsupported type: " << type << std::endl;
return false;
}
return true;
}
}; // namespace
Loading