Skip to content

Commit

Permalink
Base backups through PostgreSQL streaming replication
Browse files Browse the repository at this point in the history
Enable 'streamrep' only base backups with `backup_method=postgres`
configuration option (global/server). Requires a valid streaming
replication connection.

Barman's implementation relies on `pg_basebackup`, therefore the
following limitations apply:

- `bandwidth_limit` is available with `pg_basebackup` >= 9.4 only
- `network_compression`, `reuse_backup` and `tablespace_bandwidth_limit`
  are not supported
- configuration files outside PGDATA are not copied

Signed-off-by: Stefano Bianucci <stefano.bianucci@2ndquadrant.it>
Signed-off-by: Giulio Calacoci <giulio.calacoci@2ndquadrant.it>
Signed-off-by: Leonardo Cecchi <leonardo.cecchi@2ndquadrant.it>
Signed-off-by: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Signed-off-by: Gabriele Bartolini <gabriele.bartolini@2ndQuadrant.it>
  • Loading branch information
Stefano Bianucci authored and gbartolini committed May 30, 2016
1 parent 83e9979 commit 1dd8ea8
Show file tree
Hide file tree
Showing 14 changed files with 972 additions and 86 deletions.
8 changes: 6 additions & 2 deletions barman/backup.py
Expand Up @@ -30,7 +30,8 @@
import dateutil.tz

from barman import output, xlog
from barman.backup_executor import RsyncBackupExecutor, SshCommandException
from barman.backup_executor import (PostgresBackupExecutor,
RsyncBackupExecutor, SshCommandException)
from barman.command_wrappers import DataTransferFailure
from barman.compression import CompressionIncompatibility, CompressionManager
from barman.config import BackupOptions
Expand Down Expand Up @@ -76,7 +77,10 @@ def __init__(self, server):
self.compression_manager = CompressionManager(self.config, server.path)
self.executor = None
try:
self.executor = RsyncBackupExecutor(self)
if self.config.backup_method == "postgres":
self.executor = PostgresBackupExecutor(self)
else:
self.executor = RsyncBackupExecutor(self)
except SshCommandException as e:
self.config.disabled = True
self.config.msg_list.append(str(e).strip())
Expand Down
411 changes: 406 additions & 5 deletions barman/backup_executor.py

Large diffs are not rendered by default.

74 changes: 74 additions & 0 deletions barman/command_wrappers.py
Expand Up @@ -806,6 +806,80 @@ def __init__(self, rsync='rsync', args=None, **kwargs):
Rsync.__init__(self, rsync, args=options, **kwargs)


class PgBasebackup(Command):
"""
This class is a wrapper for the pg_basebackup system command
"""

def __init__(self, destination,
pg_basebackup='pg_basebackup',
conn_string=None,
host=None,
port=None,
user=None,
bwlimit=None,
tbs_mapping=None,
args=None,
path=None,
immediate=False,
**kwargs):
"""
Constructor
:param str conn_string: connection string
:param str host: the host to connect to
:param str port: the port used for the connection to PostgreSQL
:param str user: the user to use to connect to PostgreSQL
:param str pg_basebackup: command to run
:param str bwlimit: bandwidth limit for pg_basebackup
:param bool immediate: fast checkpoint identifier for pg_basebackup
:param str path: additional path for executable retrieval
:param List[str] args: additional arguments
:param Dict[str, str] tbs_mapping: used for tablespace
:param str destination: destination directory
relocation
"""
# Check if pg_basebackup is actually available
pg_basebackup_path = barman.utils.which(pg_basebackup, path)
if not pg_basebackup_path:
raise CommandFailedException('pg_basebackup not in system PATH: '
'is pg_basebackup installed?')

# Set the backup destination
options = ['-v', '--pgdata=%s' % destination]

# The tablespace mapping option is repeated once for each tablespace
if tbs_mapping:
for (tbs_source, tbs_destination) in tbs_mapping.items():
options.append('--tablespace-mapping=%s=%s' %
(tbs_source, tbs_destination))

# Pass the connections parameters
if conn_string:
options.append("--dbname=%s" % conn_string)
if host:
options.append("--host=%s" % host)
if port:
options.append("--port=%s" % port)
if host:
options.append("--username=%s" % user)

# Only global bandwidth limit is supported
if bwlimit is not None and bwlimit > 0:
options.append("--max-rate=%s" % bwlimit)

# Immediate checkpoint
if immediate:
options.append("--checkpoint=fast")

# Add other arguments
if args:
options += args

Command.__init__(self, pg_basebackup, args=options, check=True,
path=path, **kwargs)


class PgReceiveXlog(Command):
"""
Wrapper class for pg_receivexlog
Expand Down
6 changes: 4 additions & 2 deletions barman/config.py
Expand Up @@ -96,7 +96,7 @@ def parse(self, value, key, source):
Parses a list of values and correctly assign the set of values
(removing duplication) and checking for conflicts.
"""
if value == '':
if not value:
return
values_list = value.split(',')
for val in sorted(values_list):
Expand Down Expand Up @@ -155,6 +155,8 @@ def validate(self, key, source):
Validates backup_option values: currently it makes sure
that either exclusive_backup or concurrent_backup are set.
"""
if len(self) == 0:
return
if self.CONCURRENT_BACKUP not in self \
and self.EXCLUSIVE_BACKUP not in self:
raise ValueError("Invalid configuration value for "
Expand Down Expand Up @@ -344,7 +346,7 @@ class ServerConfig(object):
'archiver': 'on',
'backup_directory': '%(barman_home)s/%(name)s',
'backup_method': 'rsync',
'backup_options': "%s" % BackupOptions.EXCLUSIVE_BACKUP,
'backup_options': '',
'basebackup_retry_sleep': '30',
'basebackup_retry_times': '0',
'basebackups_directory': '%(backup_directory)s/base',
Expand Down
13 changes: 13 additions & 0 deletions barman/output.py
Expand Up @@ -438,6 +438,19 @@ def result_recovery(self, results):
assertion.key,
assertion.value)

if results['missing_files']:
# At least one file is missing, warn the user
self.info("")
self.info("WARNING")
self.info("The following configuration files have not been "
"saved during backup, hence they have not been "
"restored.")
self.info("You need to manually restore them "
"in order to start the recovered PostgreSQL instance:")
self.info("")
for file_name in results['missing_files']:
self.info(" %s" % file_name)

if results['delete_barman_xlog']:
self.info("")
self.info("After the recovery, please remember to remove the "
Expand Down
21 changes: 20 additions & 1 deletion barman/recovery_executor.py
Expand Up @@ -164,6 +164,24 @@ def map_temporary_config_files(self, recovery_info, backup_info,
:param barman.infofile.BackupInfo backup_info: a backup representation
:param str remote_command: ssh command for remote recovery
"""

# Cycle over postgres configuration files which my be missing.
# If a file is missing, we will be unable to restore it and
# we will warn the user.
# This can happen if we are using pg_basebackup and
# a configuration file is located outside the data dir.
# This is not an error condition, so we check also for
# `pg_ident.conf` which is an optional file.
for conf_file in (recovery_info['configuration_files'] +
['pg_hba.conf', 'pg_ident.conf']):
source_path = os.path.join(
backup_info.get_data_directory(), conf_file)
if not os.path.exists(source_path):
recovery_info['results']['missing_files'].append(conf_file)
# Remove the file from the list of configuration files
if conf_file in recovery_info['configuration_files']:
recovery_info['configuration_files'].remove(conf_file)

for conf_file in recovery_info['configuration_files']:
if remote_command:
# If the recovery is remote, copy the postgresql.conf
Expand All @@ -175,11 +193,11 @@ def map_temporary_config_files(self, recovery_info, backup_info,
shutil.copy2(
os.path.join(backup_info.get_data_directory(),
conf_file), conf_file_path)
# If is a remote recovery the conf files are inside a temporary dir
else:
# Otherwise use the local destination path.
conf_file_path = os.path.join(
recovery_info['destination_path'], conf_file)

recovery_info['temporary_configuration_files'].append(
conf_file_path)

Expand Down Expand Up @@ -472,6 +490,7 @@ def setup(self, backup_info, remote_command, dest):
'changes': [],
'warnings': [],
'delete_barman_xlog': False,
'missing_files': [],
'get_wal': False,
}
recovery_info['results'] = results
Expand Down
38 changes: 27 additions & 11 deletions barman/server.py
Expand Up @@ -165,20 +165,25 @@ def __init__(self, config):
self.config = config
self.path = self._build_path(self.config.path_prefix)
self.process_manager = ProcessManager(self.config)
self.backup_manager = BackupManager(self)

self.enforce_retention_policies = False
self.postgres = None
self.streaming = None
self.archivers = []

# Initialize the backup manager
self.backup_manager = BackupManager(self)

# Initialize the main PostgreSQL connection
try:
self.postgres = PostgreSQLConnection(config)
# If the PostgreSQLConnection creation fails, disable the Server
except ConninfoException as e:
self.config.disabled = True
self.config.msg_list.append("conninfo: " + str(e).strip())

# Order of items in self.archivers list is important!
# Initialize the FileWalArchiver
# WARNING: Order of items in self.archivers list is important!
# The files will be archived in that order.
if self.config.archiver:
try:
Expand All @@ -195,21 +200,32 @@ def __init__(self, config):
self.config.msg_list.append("The option archiver = off "
"is not yet supported")

if self.config.streaming_archiver:
# Initialize the streaming PostgreSQL connection only when
# backup_method is postgres or the streaming_archiver is in use
if (self.config.backup_method == 'postgres' or
self.config.streaming_archiver):
try:
self.streaming = StreamingConnection(config)
self.archivers.append(StreamingWalArchiver(
self.backup_manager))
# If the StreamingConnection creation fails, disable the Server
# If the StreamingConnection creation fails, disable the server
except ConninfoException as e:
self.config.disabled = True
self.config.msg_list.append("streaming_conninfo: " +
str(e).strip())
except AttributeError as e:
_logger.debug(e)
self.config.disabled = True
self.config.msg_list.append('Unable to initialise the '
'streaming archiver')

# Initialize the StreamingWalArchiver
# WARNING: Order of items in self.archivers list is important!
# The files will be archived in that order.
if self.config.streaming_archiver:
try:
self.archivers.append(StreamingWalArchiver(
self.backup_manager))
# If the StreamingWalArchiver creation fails,
# disable the server
except AttributeError as e:
_logger.debug(e)
self.config.disabled = True
self.config.msg_list.append('Unable to initialise the '
'streaming archiver')
if len(self.archivers) < 1:
self.config.disabled = True
self.config.msg_list.append(
Expand Down

0 comments on commit 1dd8ea8

Please sign in to comment.