Skip to content

Commit

Permalink
dbcopy: compare
Browse files Browse the repository at this point in the history
  • Loading branch information
franku committed Jan 31, 2020
1 parent 04ad52f commit f340379
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 75 deletions.
6 changes: 6 additions & 0 deletions core/src/dird/dbconvert/database_column_descriptions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "cats/cats.h"
#include "dird/dbconvert/database_column_descriptions.h"

#include <algorithm>
#include <iostream>

DatabaseColumnDescriptions::DatabaseColumnDescriptions(BareosDb* db) : db_{db}
Expand All @@ -39,6 +40,11 @@ void DatabaseColumnDescriptions::SelectTableDescriptions(
err += sql_query;
throw std::runtime_error(err);
}
std::sort(column_descriptions.begin(), column_descriptions.end(),
[](std::unique_ptr<ColumnDescription>& v1,
std::unique_ptr<ColumnDescription>& v2) {
return v1->column_name < v2->column_name;
});
}

int DatabaseColumnDescriptionsPostgresql::ResultHandler(void* ctx,
Expand Down
2 changes: 0 additions & 2 deletions core/src/dird/dbconvert/database_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
#include "include/jcr.h"
#include "lib/parse_conf.h"

#include <map>

class BareosDb;
class JobControlRecord;

Expand Down
11 changes: 8 additions & 3 deletions core/src/dird/dbconvert/database_export.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ class DatabaseExport {
const DatabaseConnection& db_connection,
bool clear_tables = false);

virtual void Start() = 0;
virtual void operator<<(const RowData& data) = 0;
virtual void End() = 0;
virtual void StartTable() = 0;
virtual void EndTable() = 0;

virtual void CopyStart() = 0;
virtual void CopyRow(const RowData& data) = 0;
virtual void CopyEnd() = 0;

virtual void CompareRow(const RowData& data) = 0;

protected:
BareosDb* db_{};
Expand Down
180 changes: 133 additions & 47 deletions core/src/dird/dbconvert/database_export_postgresql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,32 @@ DatabaseExportPostgresql::DatabaseExportPostgresql(
}
}

DatabaseExportPostgresql::~DatabaseExportPostgresql() = default;
DatabaseExportPostgresql::~DatabaseExportPostgresql()
{
if (transaction_) { db_->SqlQuery("ROLLBACK"); }
}

void DatabaseExportPostgresql::operator<<(const RowData& data)
void DatabaseExportPostgresql::CopyRow(const RowData& data)
{
std::string query{"INSERT INTO "};
query += data.table_name;
query += " (";
for (const auto& r : data.row) {
query += r.first;
for (const auto& c : data.column_descriptions) {
query += c->column_name;
query += ", ";
}
query.erase(query.end() - 2);
query.resize(query.size() - 2);
query += ")";

query += " VALUES (";

for (const auto& r : data.row) {
query += "'";
query += r.second.data_pointer;
query += r.data_pointer;
query += "',";
}

query.erase(query.end() - 1);
query.resize(query.size() - 1);
query += ")";
#if 0
std::cout << query << std::endl;
Expand All @@ -87,71 +90,154 @@ void DatabaseExportPostgresql::operator<<(const RowData& data)
#endif
}

void DatabaseExportPostgresql::Start()
void DatabaseExportPostgresql::CopyStart() { SelectSequenceSchema(); }

static void UpdateSequences(
BareosDb* db,
const DatabaseExportPostgresql::SequenceSchemaVector&
sequence_schema_vector)
{
if (!db_->SqlQuery("BEGIN")) { throw std::runtime_error(db_->get_errmsg()); }
for (const auto& s : sequence_schema_vector) {
std::string sequence_schema_query{"select setval(' "};
sequence_schema_query += s.sequence_name;
sequence_schema_query += "', (select max(";
sequence_schema_query += s.column_name;
sequence_schema_query += ") from ";
sequence_schema_query += s.table_name;
sequence_schema_query += "))";
if (!db->SqlQuery(sequence_schema_query.c_str())) {
throw std::runtime_error(
"DatabaseExportPostgresql: Could not set sequence");
}
#if 0
std::cout << "Updating sequence for table: " << s.table_name <<
std::endl;
#endif
}
}

struct SequenceSchema {
std::string table_name;
std::string column_name;
std::string sequence_name;
};
void DatabaseExportPostgresql::CopyEnd()
{
UpdateSequences(db_, sequence_schema_vector_);
}

void DatabaseExportPostgresql::CursorStartTable(const std::string& table_name)
{
const DatabaseTableDescriptions::TableDescription* table{
table_descriptions_->GetTableDescription(table_name)};

if (table == nullptr) {
std::string err{
"DatabaseExportPostgresql: Could not get table description for: "};
err += table_name;
throw std::runtime_error(err);
}

using SequenceSchemaVector = std::vector<SequenceSchema>;
std::string query{"DECLARE curs1 NO SCROLL CURSOR FOR SELECT "};

int DatabaseExportPostgresql::ResultHandler(void* ctx, int fields, char** row)
for (const auto& c : table->column_descriptions) {
query += c->column_name;
query += ", ";
}

query.resize(query.size() - 2);
query += " FROM ";
query += table_name;

if (!db_->SqlQuery(query.c_str())) {
std::string err{
"DatabaseExportPostgresql (cursor): Could not execute query: "};
err += db_->get_errmsg();
throw std::runtime_error(err);
}
}

void DatabaseExportPostgresql::StartTable()
{
if (fields != 3) {
throw std::runtime_error("DatabaseExportPostgresql: Wrong number of rows");
if (db_->SqlQuery("BEGIN")) {
transaction_ = true;
start_new_table = true;
}
}

SequenceSchema s;
s.table_name = row[0];
s.column_name = row[1];
void DatabaseExportPostgresql::EndTable()
{
if (transaction_) {
db_->SqlQuery("COMMIT");
transaction_ = false;
}
}

BStringList l(row[2], "'");
if (l.size() != 3) {
throw std::runtime_error(
"DatabaseExportPostgresql: Wrong column_default syntax");
void DatabaseExportPostgresql::CompareRow(const RowData& data)
{
if (start_new_table) {
CursorStartTable(data.table_name);
start_new_table = false;
}
s.sequence_name = l[1];

SequenceSchemaVector* v = static_cast<SequenceSchemaVector*>(ctx);
v->push_back(s);
std::string query{"FETCH NEXT FROM curs1"};

RowData& rd{const_cast<RowData&>(data)};

if (!db_->SqlQuery(query.c_str(), ResultHandlerCompare, &rd)) {
std::string err{
"DatabaseExportPostgresql (compare): Could not execute query: "};
err += db_->get_errmsg();
throw std::runtime_error(err);
}
}

int DatabaseExportPostgresql::ResultHandlerCompare(void* ctx,
int fields,
char** row)
{
const RowData* rd{static_cast<RowData*>(ctx)};

for (int i = 0; i < fields; i++) {
std::string r1{row[i]};
std::string r2{rd->row[i].data_pointer};
if (r1 != r2) { throw std::runtime_error("What??"); }
if (rd->table_name == "File") { std::cout << r1 << " " << r2 << std::endl; }
return 1;
}
return 0;
}

void DatabaseExportPostgresql::End()
void DatabaseExportPostgresql::SelectSequenceSchema()
{
const std::string query{
"select table_name, column_name,"
" column_default from information_schema.columns where table_schema ="
" 'public' and column_default like 'nextval(%';"};

SequenceSchemaVector v;

if (!db_->SqlQuery(query.c_str(), ResultHandler, &v)) {
if (!db_->SqlQuery(query.c_str(), ResultHandlerSequenceSchema,
&sequence_schema_vector_)) {
std::string err{"DatabaseExportPostgresql: Could not change sequence: "};
err += db_->get_errmsg();
throw std::runtime_error(err);
}
}

for (const auto& s : v) {
std::string sequence_schema_query{"select setval(' "};
sequence_schema_query += s.sequence_name;
sequence_schema_query += "', (select max(";
sequence_schema_query += s.column_name;
sequence_schema_query += ") from ";
sequence_schema_query += s.table_name;
sequence_schema_query += "))";
if (!db_->SqlQuery(sequence_schema_query.c_str())) {
throw std::runtime_error(
"DatabaseExportPostgresql: Could not set sequence");
}
// std::cout << "Updating sequence for table: " << s.table_name <<
// std::endl;
int DatabaseExportPostgresql::ResultHandlerSequenceSchema(void* ctx,
int fields,
char** row)
{
if (fields != 3) {
throw std::runtime_error("DatabaseExportPostgresql: Wrong number of rows");
}

SequenceSchema s;
s.table_name = row[0];
s.column_name = row[1];

BStringList l(row[2], "'");
if (l.size() != 3) {
throw std::runtime_error(
"DatabaseExportPostgresql: Wrong column_default syntax");
}
if (!db_->SqlQuery("COMMIT")) { throw std::runtime_error(db_->get_errmsg()); }
s.sequence_name = l[1];

SequenceSchemaVector* v = static_cast<SequenceSchemaVector*>(ctx);
v->push_back(s);
return 0;
}
29 changes: 25 additions & 4 deletions core/src/dird/dbconvert/database_export_postgresql.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,32 @@ class DatabaseExportPostgresql : public DatabaseExport {
bool clear_tables = false);
~DatabaseExportPostgresql();

void Start() override;
void operator<<(const RowData& data) override;
void End() override;
void StartTable() override;
void EndTable() override;

void CopyStart() override;
void CopyRow(const RowData& data) override;
void CopyEnd() override;

virtual void CompareRow(const RowData& data) override;

public:
struct SequenceSchema {
std::string table_name;
std::string column_name;
std::string sequence_name;
};

using SequenceSchemaVector = std::vector<SequenceSchema>;

private:
static int ResultHandler(void* ctx, int fields, char** row);
bool transaction_{false};
bool start_new_table{false};
SequenceSchemaVector sequence_schema_vector_;

void SelectSequenceSchema();
void CursorStartTable(const std::string& table_name);
static int ResultHandlerSequenceSchema(void* ctx, int fields, char** row);
static int ResultHandlerCompare(void* ctx, int fields, char** row);
};
#endif // BAREOS_SRC_DIRD_DBCONVERT_DATABASE_EXPORT_POSTGRESQL_H_
1 change: 1 addition & 0 deletions core/src/dird/dbconvert/database_import.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DatabaseImport {
virtual ~DatabaseImport();

virtual void ExportTo(DatabaseExport& exporter) = 0;
virtual void CompareWith(DatabaseExport& exporter) = 0;

static std::unique_ptr<DatabaseImport> Create(
const DatabaseConnection& db_connection);
Expand Down

0 comments on commit f340379

Please sign in to comment.