Skip to content

Commit

Permalink
dbcopy: use sql copy to insert data instead of insert statements
Browse files Browse the repository at this point in the history
  • Loading branch information
franku committed Feb 27, 2020
1 parent d2d5f9c commit bf1f305
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 35 deletions.
2 changes: 1 addition & 1 deletion core/src/cats/bdb_dbi.h
Expand Up @@ -74,7 +74,7 @@ class BareosDbDBI : public BareosDbPrivateInterface {
bool SqlBatchEndFileTable(JobControlRecord* jcr, const char* error);
bool SqlBatchInsertFileTable(JobControlRecord* jcr, AttributesDbRecord* ar);
bool SqlCopyStart(const std::string& table_name,
const std::vector<std::string>& field_names);
const std::vector<std::string>& column_names);
bool SqlCopyInsert(const std::vector<ColumnData>& columns);
bool SqlCopyEnd();

Expand Down
2 changes: 1 addition & 1 deletion core/src/cats/bdb_mysql.h
Expand Up @@ -80,7 +80,7 @@ class BareosDbMysql : public BareosDbPrivateInterface {
bool SqlBatchInsertFileTable(JobControlRecord* jcr,
AttributesDbRecord* ar) override;
bool SqlCopyStart(const std::string& table_name,
const std::vector<std::string>& field_names) override;
const std::vector<std::string>& column_names) override;
bool SqlCopyInsert(const std::vector<ColumnData>& columns) override;
bool SqlCopyEnd() override;

Expand Down
2 changes: 1 addition & 1 deletion core/src/cats/bdb_postgresql.h
Expand Up @@ -79,7 +79,7 @@ class BareosDbPostgresql : public BareosDbPrivateInterface {
bool SqlBatchInsertFileTable(JobControlRecord* jcr,
AttributesDbRecord* ar) override;
bool SqlCopyStart(const std::string& table_name,
const std::vector<std::string>& field_names) override;
const std::vector<std::string>& column_names) override;
bool SqlCopyInsert(const std::vector<ColumnData>& columns) override;
bool SqlCopyEnd() override;

Expand Down
2 changes: 1 addition & 1 deletion core/src/cats/mysql_batch.cc
Expand Up @@ -140,7 +140,7 @@ bool BareosDbMysql::SqlBatchInsertFileTable(JobControlRecord* jcr,
}

bool BareosDbMysql::SqlCopyStart(const std::string& table_name,
const std::vector<std::string>& field_names)
const std::vector<std::string>& column_names)
{
return false;
}
Expand Down
127 changes: 96 additions & 31 deletions core/src/cats/postgresql_batch.cc
Expand Up @@ -238,65 +238,130 @@ bool BareosDbPostgresql::SqlBatchInsertFileTable(JobControlRecord* jcr,
return true;
}

class CleanupResult {
public:
CleanupResult(PGresult** r, int* s) : result(r), status(s) {}
void release() { do_cleanup = false; }

~CleanupResult()
{
if (do_cleanup) {
*status = 0;
PQclear(*result);
*result = nullptr;
}
}

private:
PGresult** result;
int* status;
bool do_cleanup{true};
};

bool BareosDbPostgresql::SqlCopyStart(
const std::string& table_name,
const std::vector<std::string>& column_names)
{
Dmsg0(500, "SqlCopyStart started\n");
CleanupResult result_cleanup(&result_, &status_);

/*
* We are starting a new query. reset everything.
*/
num_rows_ = -1;
row_number_ = -1;
field_number_ = -1;

SqlFreeResult();

std::string query_copy{"COPY " + table_name + " FROM STDIN"};
std::string query{"COPY " + table_name + " ("};

for (int i = 0; i < 10; i++) {
result_ = PQexec(db_handle_, query_copy.c_str());
if (result_) { break; }
Bmicrosleep(5, 0);
for (const auto& column_name : column_names) {
query += column_name;
query += ", ";
}
query.resize(query.size() - 2);
query += ") FROM STDIN (DELIMITER '\t')";

result_ = PQexec(db_handle_, query.c_str());
if (!result_) {
Dmsg1(50, "Query failed: %s\n", query);
goto bail_out;
Mmsg1(errmsg, _("error copying in batch mode: %s"),
PQerrorMessage(db_handle_));
return false;
}

status_ = PQresultStatus(result_);
if (status_ == PGRES_COPY_IN) {
/*
* How many fields in the set?
*/
num_fields_ = (int)PQnfields(result_);
num_rows_ = 0;
status_ = 1;
} else {
Dmsg1(50, "Result status failed: %s\n", query);
goto bail_out;
if (status_ != PGRES_COPY_IN) {
Mmsg1(errmsg, _("Result status failed: %s"), PQerrorMessage(db_handle_));
return false;
}

Dmsg0(500, "SqlBatchStartFileTable finishing\n");
std::size_t n = (int)PQnfields(result_);
if (n != column_names.size()) {
Mmsg1(errmsg, _("wrong number of rows: %d"), n);
return false;
}

return true;
num_rows_ = 0;
status_ = 1;

bail_out:
Mmsg1(errmsg, _("error starting batch mode: %s"), PQerrorMessage(db_handle_));
status_ = 0;
PQclear(result_);
result_ = NULL;
return false;
result_cleanup.release();
return true;
}

bool BareosDbPostgresql::SqlCopyInsert(const std::vector<ColumnData>& columns)
{
return false;
CleanupResult result_cleanup(&result_, &status_);

std::string query;

for (const auto& column : columns) {
query += column.data_pointer;
query += "\t";
}
query.resize(query.size() - 1);
query += "\n";

int res = 0;
int count = 30;

do {
res = PQputCopyData(db_handle_, query.data(), query.size());
} while (res == 0 && --count > 0);

if (res == 1) { status_ = 1; }

if (res <= 0) {
status_ = 0;
Mmsg1(errmsg, _("error copying in batch mode: %s"),
PQerrorMessage(db_handle_));
}
return true;
}

bool BareosDbPostgresql::SqlCopyEnd() { return false; }
bool BareosDbPostgresql::SqlCopyEnd()
{
int res;
int count = 30;

CleanupResult result_cleanup(&result_, &status_);

do {
res = PQputCopyEnd(db_handle_, nullptr);
} while (res == 0 && --count > 0);

if (res <= 0) {
Mmsg1(errmsg, _("error ending batch mode: %s"), PQerrorMessage(db_handle_));
return false;
}

status_ = 1;

result_ = PQgetResult(db_handle_);
if (PQresultStatus(result_) != PGRES_COMMAND_OK) {
Mmsg1(errmsg, _("error ending batch mode: %s"), PQerrorMessage(db_handle_));
return false;
}

result_cleanup.release();
return true;
}


#endif // HAVE_POSTGRESQL

0 comments on commit bf1f305

Please sign in to comment.