diff --git a/dbbackup/management/commands/dbrestore.py b/dbbackup/management/commands/dbrestore.py index 51ad1b58..80905035 100644 --- a/dbbackup/management/commands/dbrestore.py +++ b/dbbackup/management/commands/dbrestore.py @@ -13,21 +13,16 @@ from ...dbcommands import DBCommands from ...storage.base import BaseStorage from ...storage.base import StorageError +from dbbackup import settings as dbbackup_settings from django.conf import settings from django.core.management.base import BaseCommand from django.core.management.base import CommandError from django.core.management.base import LabelCommand +from django.utils import six from django.db import connection from optparse import make_option -from dbbackup import settings as dbbackup_settings - - -# Fix Python 2.x. -try: - input = raw_input # @ReservedAssignment -except NameError: - pass +input = raw_input if six.PY2 else input # @ReservedAssignment class Command(LabelCommand): @@ -73,17 +68,17 @@ def _get_database(self, options): def restore_backup(self): """ Restore the specified database. """ - print("Restoring backup for database: %s" % self.database['NAME']) + self.stdout.write("Restoring backup for database: %s" % self.database['NAME']) # Fetch the latest backup if filepath not specified if not self.filepath: - print(" Finding latest backup") + self.stdout.write(" Finding latest backup") filepaths = self.storage.list_directory() - filepaths = list(filter(lambda f: f.endswith('.' + self.backup_extension), filepaths)) + filepaths = [f for f in filepaths if f.endswith('.' + self.backup_extension)] if not filepaths: raise CommandError("No backup files found in: /%s" % self.storage.backup_dir) self.filepath = filepaths[-1] # Restore the specified filepath backup - print(" Restoring: %s" % self.filepath) + self.stdout.write(" Restoring: %s" % self.filepath) input_filename = self.filepath inputfile = self.storage.read_file(input_filename) if self.decrypt: @@ -95,10 +90,10 @@ def restore_backup(self): uncompressed_file = self.uncompress_file(inputfile) inputfile.close() inputfile = uncompressed_file - print(" Restore tempfile created: %s" % utils.handle_size(inputfile)) - cont = input("Are you sure you want to continue? [Y/n]") - if cont.lower() not in ('y', 'yes', ''): - print("Quitting") + self.stdout.write(" Restore tempfile created: %s" % utils.handle_size(inputfile)) + answer = input("Are you sure you want to continue? [Y/n]") + if answer.lower() not in ('y', 'yes', ''): + self.stdout.write("Quitting") sys.exit(0) inputfile.seek(0) self.dbcommands.run_restore_commands(inputfile) @@ -112,7 +107,7 @@ def uncompress_file(self, inputfile): outputfile = tempfile.SpooledTemporaryFile( max_size=500 * 1024 * 1024, dir=dbbackup_settings.TMP_DIR) - zipfile = gzip.GzipFile(fileobj=inputfile, mode="r") + zipfile = gzip.GzipFile(fileobj=inputfile, mode="rb") try: inputfile.seek(0) outputfile.write(zipfile.read()) @@ -123,10 +118,10 @@ def uncompress_file(self, inputfile): def unencrypt_file(self, inputfile): """ Unencrypt this file using gpg. The input and the output are filelike objects. """ import gnupg + def get_passphrase(): - print('Input Passphrase: ') - return input() - + return input('Input Passphrase: ') + temp_dir = tempfile.mkdtemp(dir=dbbackup_settings.TMP_DIR) try: inputfile.fileno() # Convert inputfile from SpooledTemporaryFile to regular file (Fixes Issue #21) @@ -156,8 +151,8 @@ def get_passphrase(): def list_backups(self): """ List backups in the backup directory. """ - print("Listing backups on %s in /%s:" % (self.storage.name, self.storage.backup_dir)) + self.stdout.write("Listing backups on %s in /%s:" % (self.storage.name, self.storage.backup_dir)) for filepath in self.storage.list_directory(): - print(" %s" % os.path.basename(filepath)) + self.stdout.write(" %s" % os.path.basename(filepath)) # TODO: Implement filename_details method # print(utils.filename_details(filepath)) diff --git a/dbbackup/tests/commands/test_dbbackup.py b/dbbackup/tests/commands/test_dbbackup.py new file mode 100644 index 00000000..c2c4d955 --- /dev/null +++ b/dbbackup/tests/commands/test_dbbackup.py @@ -0,0 +1,73 @@ +import subprocess +from django.test import TestCase +from django.utils import six +from mock import patch +from dbbackup.management.commands.dbbackup import Command as DbbackupCommand +from dbbackup.dbcommands import DBCommands +from dbbackup.tests.utils import FakeStorage, TEST_DATABASE +from dbbackup.tests.utils import GPG_PUBLIC_PATH, DEV_NULL + + +@patch('dbbackup.settings.GPG_RECIPIENT', 'test@test') +@patch('sys.stdout', DEV_NULL) +class DbbackupCommandSaveNewBackupTest(TestCase): + def setUp(self): + if six.PY3: + self.skipTest("Compression isn't implemented in Python3") + open(TEST_DATABASE['NAME'], 'a+b').close() + self.command = DbbackupCommand() + self.command.servername = 'foo-server' + self.command.encrypt = False + self.command.compress = False + self.command.database = TEST_DATABASE['NAME'] + self.command.dbcommands = DBCommands(TEST_DATABASE) + self.command.storage = FakeStorage() + self.command.stdout = DEV_NULL + + def test_func(self): + self.command.save_new_backup(TEST_DATABASE) + + def test_compress(self): + self.command.compress = True + self.command.save_new_backup(TEST_DATABASE) + + def test_encrypt(self): + cmd = ('gpg --import %s' % GPG_PUBLIC_PATH).split() + subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL) + self.command.encrypt = True + self.command.save_new_backup(TEST_DATABASE) + + +@patch('sys.stdout', DEV_NULL) +class DbbackupCommandCleanupOldBackupsTest(TestCase): + def setUp(self): + self.command = DbbackupCommand() + self.command.database = TEST_DATABASE['NAME'] + self.command.dbcommands = DBCommands(TEST_DATABASE) + self.command.storage = FakeStorage() + self.command.clean = True + self.command.clean_keep = 1 + self.command.stdout = DEV_NULL + + def test_cleanup_old_backups(self): + self.command.cleanup_old_backups(TEST_DATABASE) + + def test_cleanup_empty(self): + self.command.storage.list_files = [] + self.command.cleanup_old_backups(TEST_DATABASE) + + +class DbbackupCommandCompressFileTest(TestCase): + def setUp(self): + if six.PY3: + self.skipTest("Compression isn't implemented in Python3") + open(TEST_DATABASE['NAME'], 'a+b').close() + self.command = DbbackupCommand() + self.command.database = TEST_DATABASE['NAME'] + self.command.dbcommands = DBCommands(TEST_DATABASE) + self.command.storage = FakeStorage() + self.command.stdout = DEV_NULL + + def test_compress_file(self): + inputfile = open(TEST_DATABASE['NAME']) + self.command.compress_file(inputfile) diff --git a/dbbackup/tests/test_restore.py b/dbbackup/tests/test_restore.py new file mode 100644 index 00000000..69c3337a --- /dev/null +++ b/dbbackup/tests/test_restore.py @@ -0,0 +1,110 @@ +import subprocess +from mock import patch +from django.test import TestCase +from django.core.management.base import CommandError +from django.conf import settings +from django.utils import six +from dbbackup.management.commands.dbrestore import Command as DbrestoreCommand +from dbbackup.dbcommands import DBCommands +from dbbackup.tests.utils import FakeStorage, ENCRYPTED_FILE, TEST_DATABASE +from dbbackup.tests.utils import GPG_PRIVATE_PATH, DEV_NULL, COMPRESSED_FILE + + +@patch('dbbackup.management.commands.dbrestore.input', return_value='y') +class DbrestoreCommandRestoreBackupTest(TestCase): + def setUp(self): + self.command = DbrestoreCommand() + self.command.stdout = DEV_NULL + self.command.uncompress = False + self.command.decrypt = False + self.command.backup_extension = 'bak' + self.command.filepath = 'foofile' + self.command.database = TEST_DATABASE + self.command.dbcommands = DBCommands(TEST_DATABASE) + self.command.storage = FakeStorage() + + def test_no_filepath(self, *args): + self.command.storage.list_files = ['foo.bak'] + self.command.filepath = None + self.command.restore_backup() + + def test_no_backup_found(self, *args): + self.command.filepath = None + with self.assertRaises(CommandError): + self.command.restore_backup() + + def test_uncompress(self, *args): + self.command.storage.file_read = COMPRESSED_FILE + self.command.filepath = COMPRESSED_FILE + self.command.uncompress = True + self.command.restore_backup() + + def test_decrypt(self, *args): + if six.PY3: + self.skipTest("Decryption isn't implemented in Python3") + cmd = ('gpg --import %s' % GPG_PRIVATE_PATH).split() + subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL) + self.command.decrypt = True + self.command.restore_backup() + + +class DbrestoreCommandGetDatabaseTest(TestCase): + def setUp(self): + self.command = DbrestoreCommand() + + def test_give_db_name(self): + db = self.command._get_database({'database': 'default'}) + self.assertEqual(db, settings.DATABASES['default']) + + def test_no_given_db(self): + db = self.command._get_database({}) + self.assertEqual(db, settings.DATABASES['default']) + + @patch('django.conf.settings.DATABASES', {'db1': {}, 'db2': {}}) + def test_no_given_db_multidb(self): + with self.assertRaises(CommandError): + self.command._get_database({}) + + +class DbrestoreCommandGetExtensionTest(TestCase): + def setUp(self): + self.command = DbrestoreCommand() + + def test_tar(self): + ext = self.command.get_extension('foo.tar') + self.assertEqual(ext, '.tar') + + def test_tar_gz(self): + ext = self.command.get_extension('foo.tar.gz') + self.assertEqual(ext, '.gz') + + def test_no_extension(self): + ext = self.command.get_extension('foo') + self.assertEqual(ext, '') + + +class DbrestoreCommandUncompressTest(TestCase): + def setUp(self): + self.command = DbrestoreCommand() + + def test_uncompress(self): + inputfile = open(COMPRESSED_FILE, 'rb') + fd = self.command.uncompress_file(inputfile) + fd.seek(0) + self.assertEqual(fd.read(), b'foo\n') + + +class DbrestoreCommandDecryptTest(TestCase): + def setUp(self): + self.command = DbrestoreCommand() + cmd = ('gpg --import %s' % GPG_PRIVATE_PATH).split() + subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL) + + @patch('dbbackup.management.commands.dbrestore.input', return_value=None) + def test_decrypt(self, *args): + if six.PY3: + self.skipTest("Decryption isn't implemented in Python3") + inputfile = open(ENCRYPTED_FILE, 'r+b') + uncryptfile = self.command.unencrypt_file(inputfile) + uncryptfile.seek(0) + self.assertEqual('foo\n', uncryptfile.read()) diff --git a/dbbackup/tests/test_utils.py b/dbbackup/tests/test_utils.py index c6db269e..1a3dc7a8 100644 --- a/dbbackup/tests/test_utils.py +++ b/dbbackup/tests/test_utils.py @@ -10,7 +10,7 @@ from mock import patch from dbbackup import utils -GPG_PRIVATE_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/pubring.gpg') +GPG_PUBLIC_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/pubring.gpg') DEV_NULL = open(os.devnull, 'w') @@ -64,7 +64,7 @@ def setUp(self): self.path = '/tmp/foo' with open(self.path, 'a') as fd: fd.write('foo') - cmd = ('gpg --import %s' % GPG_PRIVATE_PATH).split() + cmd = ('gpg --import %s' % GPG_PUBLIC_PATH).split() subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL) def tearDown(self): diff --git a/dbbackup/tests/utils.py b/dbbackup/tests/utils.py new file mode 100644 index 00000000..8f538cea --- /dev/null +++ b/dbbackup/tests/utils.py @@ -0,0 +1,32 @@ +import os +from django.conf import settings +from dbbackup.storage.base import BaseStorage + +BASE_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt') +ENCRYPTED_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt.gpg') +COMPRESSED_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt.gz') +ENCRYPTED_COMPRESSED_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt.gz.gpg') +TEST_DATABASE = {'ENGINE': 'django.db.backends.sqlite3', 'NAME': '/tmp/foo.db', 'USER': 'foo', 'PASSWORD': 'bar', 'HOST': 'foo', 'PORT': 122} + +GPG_PRIVATE_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/secring.gpg') +GPG_PUBLIC_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/pubring.gpg') +DEV_NULL = open(os.devnull, 'w') + + +class FakeStorage(BaseStorage): + name = 'FakeStorage' + list_files = ['foo', 'bar'] + deleted_files = [] + file_read = ENCRYPTED_FILE + + def delete_file(self, filepath): + self.deleted_files.append(filepath) + + def list_directory(self, raw=False): + return self.list_files + + def write_file(self, filehandle, filename): + pass + + def read_file(self, filepath): + return open(self.file_read, 'rb') diff --git a/tests/runtests.py b/tests/runtests.py index c806391a..709ec79e 100644 --- a/tests/runtests.py +++ b/tests/runtests.py @@ -4,6 +4,7 @@ import sys import django from django.conf import settings +from django.core.management import call_command here = os.path.dirname(os.path.abspath(__file__)) parent = os.path.dirname(here) @@ -37,10 +38,14 @@ def main(): if django.VERSION >= (1, 7): django.setup() - from django.test.runner import DiscoverRunner - runner = DiscoverRunner(failfast=True, verbosity=int(os.environ.get('DJANGO_DEBUG', 1))) - failures = runner.run_tests(['dbbackup'], interactive=True) - sys.exit(failures) + if sys.argv[-1] == 'shell': + call_command('shell') + sys.exit(0) + else: + from django.test.runner import DiscoverRunner + runner = DiscoverRunner(failfast=True, verbosity=int(os.environ.get('DJANGO_DEBUG', 1))) + failures = runner.run_tests(['dbbackup'], interactive=True) + sys.exit(failures) if __name__ == '__main__': main() diff --git a/tests/test.gz b/tests/test.gz new file mode 100644 index 00000000..e67280da Binary files /dev/null and b/tests/test.gz differ diff --git a/tests/test.txt.gpg b/tests/test.txt.gpg new file mode 100644 index 00000000..4174dde2 --- /dev/null +++ b/tests/test.txt.gpg @@ -0,0 +1,2 @@ +,ݬQs2V!A(:K-IgXg7M__/ +B3eQcjESv5H} c:Ui d@u٘wf7 R_idY ;GsXխQ#kx=