Permalink
Browse files

Replications and now working for MySQL with non-NGS. Some minor bug f…

…ixes and also added mbz_escape_entity() for backend specific table and column name enclosure.
  • Loading branch information...
1 parent c299237 commit b5a52f691ab535142abd83d457799a79d7a86c2a Elliot Chance committed Jun 19, 2010
Showing with 130 additions and 50 deletions.
  1. +11 −0 backend/example.pl
  2. +39 −4 backend/mysql.pl
  3. +14 −4 backend/postgresql.pl
  4. +13 −14 plugins/livestats.pl
  5. +5 −5 plugins/pendinglog.pl
  6. +10 −3 settings.pl
  7. +0 −4 src/builtins.pl
  8. +36 −15 src/functions.pl
  9. +2 −1 update.pl
View
@@ -58,6 +58,17 @@ sub backend_NAME_table_exists {
}
+# mbz_escape_entity($entity)
+# Wnen dealing with table and column names that contain upper and lowercase letters some databases
+# require the table name to be encapsulated. MySQL uses back-ticks, PostgreSQL uses double-quotes.
+# @param $entity The name of the table or column.
+# @return A new encapsulated entity.
+sub backend_NAME_escape_entity {
+ my $entity = $_[0];
+ return $entity;
+}
+
+
# mbz_table_column_exists($table_name, $col_name)
# Check if a table already has a column.
# @param $table_name The name of the table to look for.
View
@@ -27,6 +27,16 @@ sub backend_mysql_create_extra_tables {
}
+# mbz_escape_entity($entity)
+# Wnen dealing with table and column names that contain upper and lowercase letters some databases
+# require the table name to be encapsulated. MySQL uses back-ticks.
+# @return A new encapsulated entity.
+sub backend_mysql_escape_entity {
+ my $entity = $_[0];
+ return "`$entity`";
+}
+
+
# backend_mysql_get_column_type($table_name, $col_name)
# Get the MySQL column type.
# @param $table_name The name of the table.
@@ -110,6 +120,31 @@ sub backend_mysql_load_data {
# @param $id The current replication number. See mbz_get_current_replication().
# @return Always 1.
sub backend_mysql_load_pending {
+ $id = $_[0];
+
+ # make sure there are no pending transactions before cleanup
+ return -1 if(mbz_get_count($g_pending, "") ne '0');
+
+ # perform cleanup (makes sure there no left over records in the PendingData table)
+ $dbh->do("DELETE FROM `$g_pending`");
+
+ # load Pending and PendingData
+ print localtime() . ": Loading pending tables... ";
+ mbz_do_sql(qq|
+ LOAD DATA LOCAL INFILE 'replication/$id/mbdump/$g_pending'
+ INTO TABLE `$g_pending`
+ |);
+ mbz_do_sql(qq|
+ LOAD DATA LOCAL INFILE 'replication/$id/mbdump/$g_pendingdata'
+ INTO TABLE `$g_pendingdata`
+ |);
+ print "Done\n";
+
+ # PLUGIN_beforereplication()
+ foreach my $plugin (@g_active_plugins) {
+ eval("$plugin" . "_beforereplication($id)") or die($!);
+ }
+
return 1;
}
@@ -192,9 +227,10 @@ sub backend_mysql_update_index {
# TEXT then MySQL requires and index length.
my @columns = split(",", $cols);
for(my $i = 0; $i < @columns; ++$i) {
- $columns[$i] = "`" . mbz_trim(mbz_remove_quotes($columns[$i])) . "`";
if(backend_mysql_get_column_type($table_name, mbz_trim($columns[$i])) eq 'text') {
- $columns[$i] .= "(32)";
+ $columns[$i] = "`" . mbz_trim(mbz_remove_quotes($columns[$i])) . "`(32)";
+ } else {
+ $columns[$i] = "`" . mbz_trim(mbz_remove_quotes($columns[$i])) . "`";
}
}
@@ -203,9 +239,8 @@ sub backend_mysql_update_index {
$new_line .= join(",", @columns) . ")";
# all looks good so far ... create the index
- mbz_do_sql($new_line);
-
print "$new_line\n";
+ mbz_do_sql($new_line);
}
close(SQL);
View
@@ -263,17 +263,17 @@ sub backend_postgresql_load_pending {
# load Pending and PendingData
print localtime() . ": Loading pending tables... ";
- open(TABLEDUMP, "replication/$id/mbdump/Pending")
- or warn("Error: cannot open file 'replication/$id/mbdump/Pending'\n");
+ open(TABLEDUMP, "replication/$id/mbdump/$g_pending")
+ or warn("Error: cannot open file 'replication/$id/mbdump/$g_pending'\n");
$dbh->do("COPY $g_pending FROM STDIN");
while($readline = <TABLEDUMP>) {
$dbh->pg_putcopydata($readline);
}
close(TABLEDUMP);
$dbh->pg_putcopyend();
- open(TABLEDUMP, "replication/$id/mbdump/PendingData")
- or warn("Error: cannot open file 'replication/$id/mbdump/PendingData'\n");
+ open(TABLEDUMP, "replication/$id/mbdump/$g_pendingdata")
+ or warn("Error: cannot open file 'replication/$id/mbdump/$g_pendingdata'\n");
$dbh->do("COPY $g_pendingdata FROM STDIN");
while($readline = <TABLEDUMP>) {
$dbh->pg_putcopydata($readline);
@@ -292,5 +292,15 @@ sub backend_postgresql_load_pending {
}
+# mbz_escape_entity($entity)
+# Wnen dealing with table and column names that contain upper and lowercase letters some databases
+# require the table name to be encapsulated. PostgreSQL uses double-quotes.
+# @return A new encapsulated entity.
+sub backend_mysql_escape_entity {
+ my $entity = $_[0];
+ return "\"$entity\"";
+}
+
+
# be nice
return 1;
View
@@ -70,6 +70,15 @@ sub livestats_init {
$sth->execute();
$start = time();
+ # default data
+ mbz_do_sql(qq|
+ insert into livestats values
+ ('sql.insert', 0),
+ ('sql.update', 0),
+ ('sql.delete', 0),
+ ('count.pendinglog', 0)
+ |);
+
while(@result = $sth->fetchrow_array()) {
if($result[0] ne "livestats") {
print " Counting records for table $result[0]... ";
@@ -78,12 +87,11 @@ sub livestats_init {
$table = "`$table`" if($g_db_rdbms eq 'mysql');
# create the key if it doesn't exist
- $sth2 = $dbh->prepare("select count(1) from livestats where name='count.$result[0]'");
+ my $sth2 = $dbh->prepare("select count(1) from livestats where name='count.$result[0]'");
$sth2->execute();
- $key_exists = $sth->fetchrow_array();
+ my @key_exists = $sth2->fetchrow_array();
if($key_exists[0] == 0) {
- mbz_do_sql("insert into livestats (name, val) values ".
- "('count.$result[0]', 0)");
+ mbz_do_sql("insert into livestats (name, val) values ('count.$result[0]', 0)");
}
mbz_do_sql("update livestats set val=(select count(1) from $table) ".
@@ -92,19 +100,10 @@ sub livestats_init {
print "Done\n";
}
}
-
- # default data
- mbz_do_sql(qq|
- insert into livestats values
- ('sql.insert', 0),
- ('sql.update', 0),
- ('sql.delete', 0),
- ('count.pendinglog', 0)
- |);
# TODO: The name of the pending tables depends on if this is NGS or not, there should be extra
# options in settings.pl or builtins.pl to configure these table names.
- mbz_do_sql("update livestats set val=0 where name='Pending' or name='PendingData'");
+ mbz_do_sql("update livestats set val=0 where name='$g_pending' or name='$g_pendingdata'");
return 1;
}
View
@@ -40,15 +40,15 @@ sub pendinglog_beforereplication {
my ($repID) = @_;
mbz_do_sql(qq|
INSERT INTO pendinglog
- SELECT Pending.SeqId, '$repID',
+ SELECT $g_pending.SeqId, '$repID',
substring(TableName from 11 for length(TableName) - 11) as TableName, Op, XID, P1.Data,
P2.Data as keyclause
- FROM Pending
- LEFT JOIN PendingData as P1 on Pending.SeqId=P1.SeqId and P1.IsKey='f'
- LEFT JOIN PendingData as P2 on Pending.SeqId=P2.SeqId and P2.IsKey='t'
+ FROM $g_pending
+ LEFT JOIN $g_pendingdata as P1 on $g_pending.SeqId=P1.SeqId and P1.IsKey='f'
+ LEFT JOIN $g_pendingdata as P2 on $g_pending.SeqId=P2.SeqId and P2.IsKey='t'
|);
mbz_do_sql(qq|
- UPDATE livestats set val=val+(select count(1) from Pending)
+ UPDATE livestats set val=val+(select count(1) from $g_pending)
where name='count.pendinglog'
|);
return 1;
View
@@ -45,11 +45,10 @@
# You may want to ignore certain tables or fields during the replications.
@g_ignore_tables = (
- #'release_group_meta', 'release_group', 'release_groupusecount', 'release_groupwords', 'isrc',
- #'trm', 'trmjoin'
+ # eg. 'trm', 'trmjoin'
);
@g_ignore_fields = (
- #'release_group', 'release_groupusecount', 'trmids'
+ # eg. 'trmids'
);
# Schema. This is where the SQL scripts to create the schema come from, only edit this if you know
@@ -93,4 +92,12 @@
$g_db_port = mbz_get_default_port($g_db_rdbms) if($g_db_port eq 'default');
+if($g_use_ngs) {
+ $g_pending = 'dbmirror_Pending';
+ $g_pendingdata = 'dbmirror_PendingData';
+} else {
+ $g_pending = 'Pending';
+ $g_pendingdata = 'PendingData';
+}
+
return 1;
View
@@ -8,10 +8,6 @@
$g_mv = (($^O eq "MSWin32") ? "move" : "mv");
$g_rm = (($^O eq "MSWin32") ? "del" : "rm");
-# You shouldn't need to change these.
-$g_pending = '"dbmirror_Pending"';
-$g_pendingdata = '"dbmirror_PendingData"';
-
# Return the default port number for a database engine.
sub mbz_get_default_port {
return 3306 if($_[0] eq 'mysql');
View
@@ -149,6 +149,17 @@ sub mbz_download_schema {
}
+# mbz_escape_entity()
+# This subroutine is just a controller that redirects to the escape entity for the RDBMS we are
+# using.
+# @return Passthru from backend_DB_escape_entity().
+sub mbz_escape_entity {
+ # use the subroutine appropriate for the RDBMS
+ my $entity = $_[0];
+ return eval("backend_$g_db_rdbms" . "_escape_entity(\"$entity\");");
+}
+
+
# mbz_first_boot()
# We currently don't need this but may in the future. It is called by init.pl the first time init.pl
# is run.
@@ -185,9 +196,10 @@ sub mbz_format_time {
# @return SQL count() result.
sub mbz_get_count {
my ($table_name, $extra) = @_;
+ $table_name = mbz_escape_entity($table_name);
my $q = $dbh->prepare("select count(1) as count from $table_name $extra");
$q->execute();
- return $rep_total->fetchrow_hashref()->{'count'};
+ return $q->fetchrow_hashref()->{'count'};
}
@@ -271,7 +283,8 @@ sub mbz_load_data {
# @return Passthru from backend_DB_load_pending().
sub mbz_load_pending {
# use the subroutine appropriate for the RDBMS
- return eval("backend_$g_db_rdbms" . "_load_pending();");
+ my $id = $_[0];
+ return eval("backend_$g_db_rdbms" . "_load_pending(\"$id\");");
}
@@ -290,7 +303,7 @@ sub mbz_map_kv {
foreach my $k (keys(%$data)) {
$r .= $join if(!$first);
$first = 0 if($first);
- $r .= "\"$k\"=" . $dbh->quote($data->{$k});
+ $r .= mbz_escape_entity($k) . "=" . $dbh->quote($data->{$k});
}
return $r;
@@ -314,7 +327,7 @@ sub mbz_map_values {
foreach my $k (keys(%$data)) {
$r .= ',' if(!$first);
$first = 0 if($first);
- $r .= "\"$k\"";
+ $r .= mbz_escape_entity($k);
}
$r .= ") values (";
@@ -499,9 +512,14 @@ sub mbz_round {
# inderpendantly in case the user is not using the InnoDB storage engine with MySQL.
# @return Always 1.
sub mbz_run_transactions {
- my $rep_handle = $dbh->prepare("select * from $g_pending left join $g_pendingdata ".
- "on $g_pending.\"SeqId\"=$g_pendingdata.\"SeqId\" ".
- "order by $g_pending.\"SeqId\", \"IsKey\" desc");
+ my $pending = mbz_escape_entity($g_pending);
+ my $pendingdata = mbz_escape_entity($g_pendingdata);
+
+ my $rep_handle = $dbh->prepare(qq|
+ SELECT * from $pending
+ LEFT JOIN $pendingdata ON $pending.SeqId=$pendingdata.SeqId
+ ORDER BY $pending.SeqId, IsKey desc
+ |);
$rep_handle->execute();
my $totalreps = mbz_get_count($g_pending);
$starttime = time() - 1;
@@ -513,20 +531,23 @@ sub mbz_run_transactions {
my $tableName = substr($rep_row[1], 10, length($rep_row[1]) - 11);
if(mbz_in_array(\@g_ignore_tables, $tableName)) {
++$rows if($rep_row[5] eq '0' || $rep_row[2] eq 'd');
- mbz_do_sql("DELETE FROM $g_pending WHERE \"SeqId\"='$rep_row[0]'");
- mbz_do_sql("DELETE FROM $g_pendingdata WHERE \"SeqId\"='$rep_row[0]'");
+ mbz_do_sql("DELETE FROM $pending WHERE SeqId='$rep_row[0]'");
+ mbz_do_sql("DELETE FROM $pendingdata WHERE SeqId='$rep_row[0]'");
next;
}
- $key = mbz_unpack_data($rep_row[6]) if($rep_row[5] eq '1');
- if($rep_row[5] eq '0' || $rep_row[2] eq 'd') {
+ # we use '1' and 't' for MySQL and PostgreSQL
+ $key = mbz_unpack_data($rep_row[6]) if($rep_row[5] eq '1' or $rep_row[5] eq 't');
+
+ # we use '0' and 'f' for MySQL and PostgreSQL
+ if(($rep_row[5] eq '0' || $rep_row[5] eq 'f') || $rep_row[2] eq 'd') {
$data = mbz_unpack_data($rep_row[6]);
# build replicated SQL
my $sql = "insert into ";
$sql = "update " if($rep_row[2] eq 'u');
$sql = "delete from " if($rep_row[2] eq 'd');
- $sql .= "\"$tableName\" ";
+ $sql .= mbz_escape_entity($tableName) . " ";
if($rep_row[2] eq 'i') {
$sql .= mbz_map_values($data, ',');
} elsif($rep_row[2] eq 'u') {
@@ -558,8 +579,8 @@ sub mbz_run_transactions {
}
# clear for next round
- mbz_do_sql("DELETE FROM $g_pending WHERE \"SeqId\"='$rep_row[0]'");
- mbz_do_sql("DELETE FROM $g_pendingdata WHERE \"SeqId\"='$rep_row[0]'");
+ mbz_do_sql("DELETE FROM $pending WHERE SeqId='$rep_row[0]'");
+ mbz_do_sql("DELETE FROM $pendingdata WHERE SeqId='$rep_row[0]'");
undef($key);
undef($data);
++$rows;
@@ -607,7 +628,7 @@ sub mbz_show_update_help {
print " transactions.\n";
print "-p or --onlypending Only process pending transactions then quit.\n";
print "-q or --quiet Non-verbose. The status of each statement is not printed.\n";
- print "-t or --truncate Force TRUNCATE on Pending and PendindData tables.\n";
+ print "-t or --truncate Force TRUNCATE on $g_pending and $g_pendingdata tables.\n";
return 1;
}
View
@@ -12,7 +12,8 @@
require "settings.pl";
require "languages/$g_language.pl";
-require "functions.pl";
+require "backend/$g_db_rdbms.pl";
+require "src/functions.pl";
# require plugin files
foreach my $plugin (@g_active_plugins) {

0 comments on commit b5a52f6

Please sign in to comment.