Permalink
Browse files

Database exporting

  • Loading branch information...
lalinsky committed Dec 12, 2011
1 parent f47e68c commit 11b094f91d745d4c6c1fadda501270f755e217b3
Showing with 212 additions and 28 deletions.
  1. +40 −9 admin/cron/export-all.sh
  2. +16 −0 admin/cron/musicbrainz.sh
  3. +14 −0 admin/run-data-backup.sh
  4. +8 −0 admin/run-mbslave.sh
  5. +1 −1 run_psql.sh
  6. +132 −17 scripts/export_tables.py
  7. +1 −1 scripts/run_psql.py
View
@@ -1,15 +1,46 @@
-#!/bin/sh
+#!/usr/bin/env bash
set -e
-DIR=/home/acoustid/acoustid-server/
+LOCKNAME=acoustid-export
+. `dirname $0`/lock.sh
-TARGET_DIR=/home/acoustid/data/`date '+%Y-%m-%d'`/
+DIR=`dirname $0`/../..
+export PYTHONPATH=$DIR
-cd $DIR
-rm -rf /tmp/acoustid-dump/ && PYTHONPATH=$DIR python scripts/export_tables.py -c acoustid.conf
-cd /tmp
-tar -jcvf acoustid-dump.tar.bz2 acoustid-dump
-mkdir -p $TARGET_DIR
-mv acoustid-dump.tar.bz2 $TARGET_DIR
+TEMP_DIR=/tmp/acoustid-export
+DATA_DIR=/home/acoustid/data
+TARGET_DIR=$DATA_DIR/fullexport/`date '+%Y-%m-%d'`
+
+rm -rf $TEMP_DIR
+mkdir $TEMP_DIR
+
+$DIR/scripts/export_tables.py -q -c $DIR/acoustid.conf "$@"
+
+cd $TEMP_DIR
+
+# publish replication packets
+for NAME in acoustid-update acoustid-musicbrainz-update; do
+ bzip2 $NAME-*.xml
+ gpg -a --batch --passphrase-file ~/.gnupg/passphrase --detach-sign $NAME-*.xml.bz2
+ mv $NAME-*.xml.bz2{,.asc} $DATA_DIR/replication/
+done
+
+# publish data dumps
+for NAME in acoustid-dump acoustid-musicbrainz-dump; do
+ if [ -d $NAME ]; then
+ tar -cvf $NAME.tar.bz2 --use-compress-prog=pbzip2 $NAME
+ gpg -a --batch --passphrase-file ~/.gnupg/passphrase --detach-sign $NAME.tar.bz2
+ mkdir -p $TARGET_DIR
+ mv $NAME.tar.bz2{,.asc} $TARGET_DIR
+ fi
+done
+
+echo `basename $TARGET_DIR` >$DATA_DIR/fullexport/latest
+
+# synchronize backups
+test -x $DIR/admin/run-data-backup.sh && $DIR/admin/run-data-backup.sh
+
+# clean up
+rm -rf $TEMP_DIR
View
@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+
+set -e
+
+LOCKNAME=acoustid-musicbrainz
+. `dirname $0`/lock.sh
+
+DIR=`dirname $0`/../..
+export PYTHONPATH=$DIR
+
+# update the MusicBrainz database
+test -x $DIR/admin/run-mbslave.sh && $DIR/admin/run-mbslave.sh
+
+# fix merged MBIDs
+$DIR/scripts/merge_missing_mbids.py -q -c $DIR/acoustid.conf
+
View
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+set -e
+
+DATA_DIR=/home/acoustid/data
+
+# synchronize backups
+echo >> /var/log/acoustid/backup.log
+date >> /var/log/acoustid/backup.log
+rsync -av --delete --delete-after --fuzzy -e \
+ "ssh -i /home/acoustid/.ssh/backup" \
+ $DATA_DIR/ backup.acoustid.org:$DATA_DIR/ \
+ >> /var/log/acoustid/backup.log
+
View
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+set -e
+
+echo >>/var/log/acoustid/mbslave.log
+date >>/var/log/acoustid/mbslave.log
+/home/acoustid/mbslave/mbslave-sync.py >>/var/log/acoustid/mbslave.log
+
View
@@ -1,5 +1,5 @@
#!/bin/sh
DIR=`dirname $0`
-PYTHONPATH=$DIR:$PYTHONPATH $DIR/scripts/run_psql.py -c $DIR/acoustid.conf -q
+PYTHONPATH=$DIR:$PYTHONPATH $DIR/scripts/run_psql.py -c $DIR/acoustid.conf -q -- "$@"
View
@@ -5,37 +5,50 @@
import os
import logging
+import skytools
+import xml.etree.cElementTree as etree
+import psycopg2.extensions
+from contextlib import closing
from acoustid.script import run_script
from acoustid.data.track import merge_missing_mbids
logger = logging.getLogger(__name__)
-TABLES = [
- ("account", "SELECT id, 'account' || id::text, 'apikey' || id::text, '', anonymous, created, lastlogin, submission_count FROM account"),
- ("account_stats_control", None),
- ("application", "SELECT id, 'app' || id::text, '', 'apikey' || id::text, created, active, account_id FROM application"),
+CORE_TABLES = [
("fingerprint", None),
- ("fingerprint_source", None),
("format", None),
("meta", None),
- ("source", None),
+ ("replication_control", None),
("stats", None),
- ("stats_top_accounts", None),
+ ("track", None),
("track_mbid", None),
- ("track_mbid_source", None),
("track_meta", None),
- ("track_meta_source", None),
- ("track", None),
("track_puid", None),
+]
+
+PRIVATE_TABLES = [
+ #("account", "SELECT id, 'account' || id::text, 'apikey' || id::text, '', anonymous, created, lastlogin, submission_count FROM account"),
+ #("account_stats_control", None),
+ #("application", "SELECT id, 'app' || id::text, '', 'apikey' || id::text, created, active, account_id FROM application"),
+ ("fingerprint_source", None),
+ ("source", None),
+ ("track_mbid_change", None),
+ ("track_mbid_source", None),
+ ("track_meta_source", None),
("track_puid_source", None),
]
-def export_tables(cursor):
- cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE")
- base_path = "/tmp/acoustid-dump"
+MUSICBRAINZ_TABLES = [
+ ("acoustid_mb_replication_control", None),
+ ("recording_acoustid", None),
+]
+
+
+def export_tables(cursor, name, tables):
+ base_path = os.path.join("/tmp/acoustid-export", name)
os.mkdir(base_path)
- for table, sql in TABLES:
+ for table, sql in tables:
path = os.path.join(base_path, table)
logger.info("Exporting %s to %s", table, path)
with open(path, 'w') as fileobj:
@@ -46,10 +59,112 @@ def export_tables(cursor):
cursor.copy_expert(copy_sql, fileobj)
+def dump_colums(root, root_name, columns):
+ if columns:
+ node = etree.SubElement(root, root_name)
+ for name, value in columns.iteritems():
+ column_node = etree.SubElement(node, 'column')
+ column_node.attrib['name'] = name
+ if value is None:
+ column_node.attrib['null'] = 'yes'
+ else:
+ column_node.text = value.decode('UTF-8')
+
+
+def create_musicbrainz_replication_packet(cursor):
+ cursor.execute("""
+ UPDATE acoustid_mb_replication_control
+ SET current_replication_sequence = current_replication_sequence + 1,
+ last_replication_date = now()
+ RETURNING current_schema_sequence, current_replication_sequence""")
+ schema_seq, replication_seq = cursor.fetchone()
+ cursor.execute("""
+ SELECT * FROM mirror_queue
+ WHERE tblname IN ('recording_acoustid', 'acoustid_mb_replication_control')
+ ORDER BY txid, id""")
+ packet_node = etree.Element('packet')
+ packet_node.attrib['schema_seq'] = str(schema_seq)
+ packet_node.attrib['replication_seq'] = str(replication_seq)
+ transaction_node = None
+ transaction_id = None
+ for seqid, txid, table, operation, data in cursor:
+ if transaction_id is None or transaction_id != txid:
+ transaction_node = etree.SubElement(packet_node, 'transaction')
+ transaction_node.attrib['id'] = str(txid)
+ transaction_id = txid
+ event_node = etree.SubElement(transaction_node, 'event')
+ event_node.attrib['table'] = table
+ event_node.attrib['op'] = operation
+ event_node.attrib['id'] = str(seqid)
+ keys, values = skytools.parse_logtriga_sql(operation, data.encode('UTF-8'), splitkeys=True)
+ dump_colums(event_node, 'keys', keys)
+ dump_colums(event_node, 'values', values)
+ fp = open('/tmp/acoustid-export/acoustid-musicbrainz-update-%d.xml' % replication_seq, 'w')
+ fp.write(etree.tostring(packet_node, encoding="UTF-8"))
+ fp.flush()
+ os.fsync(fp.fileno())
+ fp.close()
+
+
+def create_replication_packet(cursor):
+ cursor.execute("""
+ UPDATE replication_control
+ SET current_replication_sequence = current_replication_sequence + 1,
+ last_replication_date = now()
+ RETURNING current_schema_sequence, current_replication_sequence""")
+ schema_seq, replication_seq = cursor.fetchone()
+ cursor.execute("""
+ SELECT * FROM mirror_queue
+ WHERE tblname NOT IN ('recording_acoustid', 'acoustid_mb_replication_control')
+ ORDER BY txid, id""")
+ packet_node = etree.Element('packet')
+ packet_node.attrib['schema_seq'] = str(schema_seq)
+ packet_node.attrib['replication_seq'] = str(replication_seq)
+ transaction_node = None
+ transaction_id = None
+ for seqid, txid, table, operation, data in cursor:
+ if transaction_id is None or transaction_id != txid:
+ transaction_node = etree.SubElement(packet_node, 'transaction')
+ transaction_node.attrib['id'] = str(txid)
+ transaction_id = txid
+ event_node = etree.SubElement(transaction_node, 'event')
+ event_node.attrib['table'] = table
+ event_node.attrib['op'] = operation
+ event_node.attrib['id'] = str(seqid)
+ keys, values = skytools.parse_logtriga_sql(operation, data.encode('UTF-8'), splitkeys=True)
+ dump_colums(event_node, 'keys', keys)
+ dump_colums(event_node, 'values', values)
+ fp = open('/tmp/acoustid-export/acoustid-update-%d.xml' % replication_seq, 'w')
+ fp.write(etree.tostring(packet_node, encoding="UTF-8"))
+ fp.flush()
+ os.fsync(fp.fileno())
+ fp.close()
+
+
+def export_replication(cursor):
+ create_replication_packet(cursor)
+ create_musicbrainz_replication_packet(cursor)
+ cursor.execute("DELETE FROM mirror_queue")
+
+
def main(script, opts, args):
conn = script.engine.connect()
- cursor = conn.engine.raw_connection().cursor()
- export_tables(cursor)
+ conn.detach()
+ with closing(conn):
+ conn.connection.rollback()
+ conn.connection.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ cursor = conn.connection.cursor()
+ export_replication(cursor)
+ if opts.full:
+ #export_tables(cursor, 'acoustid-dump', CORE_TABLES)
+ export_tables(cursor, 'acoustid-musicbrainz-dump', MUSICBRAINZ_TABLES)
+ conn.connection.commit()
+
+
+def add_options(parser):
+ parser.add_option("-f", "--full", dest="full", action="store_true",
+ default=False, help="full export")
+
-run_script(main)
+run_script(main, add_options)
View
@@ -8,7 +8,7 @@
def main(script, opts, args):
- os.execlp('psql', 'psql', *script.config.database.create_psql_args())
+ os.execlp('psql', 'psql', *(script.config.database.create_psql_args() + args))
run_script(main)

0 comments on commit 11b094f

Please sign in to comment.