Skip to content

Commit

Permalink
Merge pull request jazzband#73 from ZuluPro/test_dbrestore
Browse files Browse the repository at this point in the history
Added tests and fix dbrestore command
  • Loading branch information
benjaoming committed Jul 31, 2015
2 parents 8a209dd + 7ce1127 commit b64552b
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 28 deletions.
39 changes: 17 additions & 22 deletions dbbackup/management/commands/dbrestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,16 @@
from ...dbcommands import DBCommands
from ...storage.base import BaseStorage
from ...storage.base import StorageError
from dbbackup import settings as dbbackup_settings
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django.core.management.base import LabelCommand
from django.utils import six
from django.db import connection
from optparse import make_option

from dbbackup import settings as dbbackup_settings


# Fix Python 2.x.
try:
input = raw_input # @ReservedAssignment
except NameError:
pass
input = raw_input if six.PY2 else input # @ReservedAssignment


class Command(LabelCommand):
Expand Down Expand Up @@ -73,17 +68,17 @@ def _get_database(self, options):

def restore_backup(self):
""" Restore the specified database. """
print("Restoring backup for database: %s" % self.database['NAME'])
self.stdout.write("Restoring backup for database: %s" % self.database['NAME'])
# Fetch the latest backup if filepath not specified
if not self.filepath:
print(" Finding latest backup")
self.stdout.write(" Finding latest backup")
filepaths = self.storage.list_directory()
filepaths = list(filter(lambda f: f.endswith('.' + self.backup_extension), filepaths))
filepaths = [f for f in filepaths if f.endswith('.' + self.backup_extension)]
if not filepaths:
raise CommandError("No backup files found in: /%s" % self.storage.backup_dir)
self.filepath = filepaths[-1]
# Restore the specified filepath backup
print(" Restoring: %s" % self.filepath)
self.stdout.write(" Restoring: %s" % self.filepath)
input_filename = self.filepath
inputfile = self.storage.read_file(input_filename)
if self.decrypt:
Expand All @@ -95,10 +90,10 @@ def restore_backup(self):
uncompressed_file = self.uncompress_file(inputfile)
inputfile.close()
inputfile = uncompressed_file
print(" Restore tempfile created: %s" % utils.handle_size(inputfile))
cont = input("Are you sure you want to continue? [Y/n]")
if cont.lower() not in ('y', 'yes', ''):
print("Quitting")
self.stdout.write(" Restore tempfile created: %s" % utils.handle_size(inputfile))
answer = input("Are you sure you want to continue? [Y/n]")
if answer.lower() not in ('y', 'yes', ''):
self.stdout.write("Quitting")
sys.exit(0)
inputfile.seek(0)
self.dbcommands.run_restore_commands(inputfile)
Expand All @@ -112,7 +107,7 @@ def uncompress_file(self, inputfile):
outputfile = tempfile.SpooledTemporaryFile(
max_size=500 * 1024 * 1024,
dir=dbbackup_settings.TMP_DIR)
zipfile = gzip.GzipFile(fileobj=inputfile, mode="r")
zipfile = gzip.GzipFile(fileobj=inputfile, mode="rb")
try:
inputfile.seek(0)
outputfile.write(zipfile.read())
Expand All @@ -123,10 +118,10 @@ def uncompress_file(self, inputfile):
def unencrypt_file(self, inputfile):
""" Unencrypt this file using gpg. The input and the output are filelike objects. """
import gnupg

def get_passphrase():
print('Input Passphrase: ')
return input()

return input('Input Passphrase: ')

temp_dir = tempfile.mkdtemp(dir=dbbackup_settings.TMP_DIR)
try:
inputfile.fileno() # Convert inputfile from SpooledTemporaryFile to regular file (Fixes Issue #21)
Expand Down Expand Up @@ -156,8 +151,8 @@ def get_passphrase():

def list_backups(self):
""" List backups in the backup directory. """
print("Listing backups on %s in /%s:" % (self.storage.name, self.storage.backup_dir))
self.stdout.write("Listing backups on %s in /%s:" % (self.storage.name, self.storage.backup_dir))
for filepath in self.storage.list_directory():
print(" %s" % os.path.basename(filepath))
self.stdout.write(" %s" % os.path.basename(filepath))
# TODO: Implement filename_details method
# print(utils.filename_details(filepath))
73 changes: 73 additions & 0 deletions dbbackup/tests/commands/test_dbbackup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import subprocess
from django.test import TestCase
from django.utils import six
from mock import patch
from dbbackup.management.commands.dbbackup import Command as DbbackupCommand
from dbbackup.dbcommands import DBCommands
from dbbackup.tests.utils import FakeStorage, TEST_DATABASE
from dbbackup.tests.utils import GPG_PUBLIC_PATH, DEV_NULL


@patch('dbbackup.settings.GPG_RECIPIENT', 'test@test')
@patch('sys.stdout', DEV_NULL)
class DbbackupCommandSaveNewBackupTest(TestCase):
def setUp(self):
if six.PY3:
self.skipTest("Compression isn't implemented in Python3")
open(TEST_DATABASE['NAME'], 'a+b').close()
self.command = DbbackupCommand()
self.command.servername = 'foo-server'
self.command.encrypt = False
self.command.compress = False
self.command.database = TEST_DATABASE['NAME']
self.command.dbcommands = DBCommands(TEST_DATABASE)
self.command.storage = FakeStorage()
self.command.stdout = DEV_NULL

def test_func(self):
self.command.save_new_backup(TEST_DATABASE)

def test_compress(self):
self.command.compress = True
self.command.save_new_backup(TEST_DATABASE)

def test_encrypt(self):
cmd = ('gpg --import %s' % GPG_PUBLIC_PATH).split()
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
self.command.encrypt = True
self.command.save_new_backup(TEST_DATABASE)


@patch('sys.stdout', DEV_NULL)
class DbbackupCommandCleanupOldBackupsTest(TestCase):
def setUp(self):
self.command = DbbackupCommand()
self.command.database = TEST_DATABASE['NAME']
self.command.dbcommands = DBCommands(TEST_DATABASE)
self.command.storage = FakeStorage()
self.command.clean = True
self.command.clean_keep = 1
self.command.stdout = DEV_NULL

def test_cleanup_old_backups(self):
self.command.cleanup_old_backups(TEST_DATABASE)

def test_cleanup_empty(self):
self.command.storage.list_files = []
self.command.cleanup_old_backups(TEST_DATABASE)


class DbbackupCommandCompressFileTest(TestCase):
def setUp(self):
if six.PY3:
self.skipTest("Compression isn't implemented in Python3")
open(TEST_DATABASE['NAME'], 'a+b').close()
self.command = DbbackupCommand()
self.command.database = TEST_DATABASE['NAME']
self.command.dbcommands = DBCommands(TEST_DATABASE)
self.command.storage = FakeStorage()
self.command.stdout = DEV_NULL

def test_compress_file(self):
inputfile = open(TEST_DATABASE['NAME'])
self.command.compress_file(inputfile)
110 changes: 110 additions & 0 deletions dbbackup/tests/test_restore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import subprocess
from mock import patch
from django.test import TestCase
from django.core.management.base import CommandError
from django.conf import settings
from django.utils import six
from dbbackup.management.commands.dbrestore import Command as DbrestoreCommand
from dbbackup.dbcommands import DBCommands
from dbbackup.tests.utils import FakeStorage, ENCRYPTED_FILE, TEST_DATABASE
from dbbackup.tests.utils import GPG_PRIVATE_PATH, DEV_NULL, COMPRESSED_FILE


@patch('dbbackup.management.commands.dbrestore.input', return_value='y')
class DbrestoreCommandRestoreBackupTest(TestCase):
def setUp(self):
self.command = DbrestoreCommand()
self.command.stdout = DEV_NULL
self.command.uncompress = False
self.command.decrypt = False
self.command.backup_extension = 'bak'
self.command.filepath = 'foofile'
self.command.database = TEST_DATABASE
self.command.dbcommands = DBCommands(TEST_DATABASE)
self.command.storage = FakeStorage()

def test_no_filepath(self, *args):
self.command.storage.list_files = ['foo.bak']
self.command.filepath = None
self.command.restore_backup()

def test_no_backup_found(self, *args):
self.command.filepath = None
with self.assertRaises(CommandError):
self.command.restore_backup()

def test_uncompress(self, *args):
self.command.storage.file_read = COMPRESSED_FILE
self.command.filepath = COMPRESSED_FILE
self.command.uncompress = True
self.command.restore_backup()

def test_decrypt(self, *args):
if six.PY3:
self.skipTest("Decryption isn't implemented in Python3")
cmd = ('gpg --import %s' % GPG_PRIVATE_PATH).split()
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
self.command.decrypt = True
self.command.restore_backup()


class DbrestoreCommandGetDatabaseTest(TestCase):
def setUp(self):
self.command = DbrestoreCommand()

def test_give_db_name(self):
db = self.command._get_database({'database': 'default'})
self.assertEqual(db, settings.DATABASES['default'])

def test_no_given_db(self):
db = self.command._get_database({})
self.assertEqual(db, settings.DATABASES['default'])

@patch('django.conf.settings.DATABASES', {'db1': {}, 'db2': {}})
def test_no_given_db_multidb(self):
with self.assertRaises(CommandError):
self.command._get_database({})


class DbrestoreCommandGetExtensionTest(TestCase):
def setUp(self):
self.command = DbrestoreCommand()

def test_tar(self):
ext = self.command.get_extension('foo.tar')
self.assertEqual(ext, '.tar')

def test_tar_gz(self):
ext = self.command.get_extension('foo.tar.gz')
self.assertEqual(ext, '.gz')

def test_no_extension(self):
ext = self.command.get_extension('foo')
self.assertEqual(ext, '')


class DbrestoreCommandUncompressTest(TestCase):
def setUp(self):
self.command = DbrestoreCommand()

def test_uncompress(self):
inputfile = open(COMPRESSED_FILE, 'rb')
fd = self.command.uncompress_file(inputfile)
fd.seek(0)
self.assertEqual(fd.read(), b'foo\n')


class DbrestoreCommandDecryptTest(TestCase):
def setUp(self):
self.command = DbrestoreCommand()
cmd = ('gpg --import %s' % GPG_PRIVATE_PATH).split()
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)

@patch('dbbackup.management.commands.dbrestore.input', return_value=None)
def test_decrypt(self, *args):
if six.PY3:
self.skipTest("Decryption isn't implemented in Python3")
inputfile = open(ENCRYPTED_FILE, 'r+b')
uncryptfile = self.command.unencrypt_file(inputfile)
uncryptfile.seek(0)
self.assertEqual('foo\n', uncryptfile.read())
4 changes: 2 additions & 2 deletions dbbackup/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from mock import patch
from dbbackup import utils

GPG_PRIVATE_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/pubring.gpg')
GPG_PUBLIC_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/pubring.gpg')
DEV_NULL = open(os.devnull, 'w')


Expand Down Expand Up @@ -64,7 +64,7 @@ def setUp(self):
self.path = '/tmp/foo'
with open(self.path, 'a') as fd:
fd.write('foo')
cmd = ('gpg --import %s' % GPG_PRIVATE_PATH).split()
cmd = ('gpg --import %s' % GPG_PUBLIC_PATH).split()
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)

def tearDown(self):
Expand Down
32 changes: 32 additions & 0 deletions dbbackup/tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os
from django.conf import settings
from dbbackup.storage.base import BaseStorage

BASE_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt')
ENCRYPTED_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt.gpg')
COMPRESSED_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt.gz')
ENCRYPTED_COMPRESSED_FILE = os.path.join(settings.BASE_DIR, 'tests/test.txt.gz.gpg')
TEST_DATABASE = {'ENGINE': 'django.db.backends.sqlite3', 'NAME': '/tmp/foo.db', 'USER': 'foo', 'PASSWORD': 'bar', 'HOST': 'foo', 'PORT': 122}

GPG_PRIVATE_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/secring.gpg')
GPG_PUBLIC_PATH = os.path.join(settings.BASE_DIR, 'tests/gpg/pubring.gpg')
DEV_NULL = open(os.devnull, 'w')


class FakeStorage(BaseStorage):
name = 'FakeStorage'
list_files = ['foo', 'bar']
deleted_files = []
file_read = ENCRYPTED_FILE

def delete_file(self, filepath):
self.deleted_files.append(filepath)

def list_directory(self, raw=False):
return self.list_files

def write_file(self, filehandle, filename):
pass

def read_file(self, filepath):
return open(self.file_read, 'rb')
13 changes: 9 additions & 4 deletions tests/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import django
from django.conf import settings
from django.core.management import call_command

here = os.path.dirname(os.path.abspath(__file__))
parent = os.path.dirname(here)
Expand Down Expand Up @@ -37,10 +38,14 @@
def main():
if django.VERSION >= (1, 7):
django.setup()
from django.test.runner import DiscoverRunner
runner = DiscoverRunner(failfast=True, verbosity=int(os.environ.get('DJANGO_DEBUG', 1)))
failures = runner.run_tests(['dbbackup'], interactive=True)
sys.exit(failures)
if sys.argv[-1] == 'shell':
call_command('shell')
sys.exit(0)
else:
from django.test.runner import DiscoverRunner
runner = DiscoverRunner(failfast=True, verbosity=int(os.environ.get('DJANGO_DEBUG', 1)))
failures = runner.run_tests(['dbbackup'], interactive=True)
sys.exit(failures)

if __name__ == '__main__':
main()
Binary file added tests/test.gz
Binary file not shown.
2 changes: 2 additions & 0 deletions tests/test.txt.gpg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
��,ݬ���Qs�2��V!A(���:��K-I�g����Xg�7�M���__���/�
�B3��e����QcjES�v5��H��}� �c:Ui���� ��d��@��u����٘�wf���7�R�_��i�dY�;�G�sX��խQ��#kx=<�U�b�-<�=׼'����3�Ϯ�U��:��NCW������N ?�����E
Expand Down
Binary file added tests/test.txt.gz
Binary file not shown.
Binary file added tests/test.txt.gz.gpg
Binary file not shown.

0 comments on commit b64552b

Please sign in to comment.