Skip to content

Commit

Permalink
Add support for concurrent backups with PostgreSQL 9.6
Browse files Browse the repository at this point in the history
As of version 9.6 PostgreSQL support concurrent backups natively.
Add support to the new API, using new calls for concurrent
backups. Users from PostgreSQL 9.2 to 9.5 can continue using
the pgespresso extension.

Code has been refactored accordingly.

Signed-off-by: Giulio Calacoci <giulio.calacoci@2ndquadrant.it>
Signed-off-by: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Signed-off-by: Gabriele Bartolini <gabriele.bartolini@2ndQuadrant.it>
  • Loading branch information
gbartolini committed Jun 17, 2016
1 parent 0a646b6 commit fec853b
Show file tree
Hide file tree
Showing 7 changed files with 514 additions and 141 deletions.
218 changes: 150 additions & 68 deletions barman/backup_executor.py
Expand Up @@ -36,9 +36,9 @@
from barman import output, xlog
from barman.command_wrappers import PgBaseBackup, RsyncPgData
from barman.config import BackupOptions
from barman.exceptions import (CommandFailedException, DataTransferFailure,
FsOperationFailed, PostgresConnectionError,
PostgresException, SshCommandException)
from barman.exceptions import (CommandFailedException,
DataTransferFailure, FsOperationFailed,
PostgresConnectionError, SshCommandException)
from barman.fs import UnixRemoteCommand
from barman.infofile import BackupInfo
from barman.remote_status import RemoteStatusMixin
Expand Down Expand Up @@ -281,7 +281,7 @@ def backup(self, backup_info):
# Start the copy
self.current_action = "copying files"
output.info("Copying files.")
self.backup_manager.retry_backup_copy(self.postgres_backup_copy,
self.backup_manager.retry_backup_copy(self.backup_copy,
backup_info)
output.info("Copy done.")
self.strategy.stop_backup(backup_info)
Expand Down Expand Up @@ -389,7 +389,7 @@ def fetch_remote_status(self):

return remote_status

def postgres_backup_copy(self, backup_info):
def backup_copy(self, backup_info):
"""
Perform the actual copy of the backup using pg_basebackup.
First, manages tablespaces, then copies the base backup
Expand Down Expand Up @@ -907,13 +907,18 @@ def __init__(self, executor, mode=None):
self.current_action = None
self.mode = mode

@abstractmethod
def start_backup(self, backup_info):
"""
Issue a start of a backup - invoked by BackupExecutor.backup()
:param barman.infofile.BackupInfo backup_info: backup information
"""
# Retrieve PostgreSQL server metadata
self._pg_get_metadata(backup_info)

# Record that we are about to start the backup
self.current_action = "issuing start backup command"
_logger.debug(self.current_action)

@abstractmethod
def stop_backup(self, backup_info):
Expand Down Expand Up @@ -970,6 +975,35 @@ def _pg_get_metadata(self, backup_info):
msg = "\t%s, %s, %s" % (item.oid, item.name, item.location)
_logger.info(msg)

def _backup_info_from_start_backup(self, backup_info, start_row):
"""
Fill a backup info with information from a start_backup
:param barman.infofile.BackupInfo backup_info: object representing a
backup
:param DictCursor start_row: the result of the pg_start_backup command
"""
backup_info.set_attribute('status', "STARTED")
backup_info.set_attribute('timeline',
int(start_row['file_name'][0:8], 16))
backup_info.set_attribute('begin_xlog', start_row['location'])
backup_info.set_attribute('begin_wal', start_row['file_name'])
backup_info.set_attribute('begin_offset', start_row['file_offset'])
backup_info.set_attribute('begin_time', start_row['timestamp'])

def _backup_info_from_stop_backup(self, backup_info, stop_row):
"""
Fill a backup info with information from a stop_backup
:param barman.infofile.BackupInfo backup_info: object representing a
backup
:param DictCursor stop_row: the result of the pg_stop_backup command
"""
backup_info.set_attribute('end_time', stop_row['timestamp'])
backup_info.set_attribute('end_xlog', stop_row['location'])
backup_info.set_attribute('end_wal', stop_row['file_name'])
backup_info.set_attribute('end_offset', stop_row['file_offset'])


class PostgresBackupStrategy(BackupStrategy):
"""
Expand Down Expand Up @@ -1094,27 +1128,14 @@ def start_backup(self, backup_info):
:param barman.infofile.BackupInfo backup_info: backup information
"""
self.current_action = "connecting to database (%s)" % \
self.executor.config.conninfo
output.debug(self.current_action)
# Retrieve PostgreSQL server metadata
self._pg_get_metadata(backup_info)

# Issue pg_start_backup on the PostgreSQL server
self.current_action = "issuing start backup command"
_logger.debug(self.current_action)
super(ExclusiveBackupStrategy, self).start_backup(backup_info)
label = "Barman backup %s %s" % (
backup_info.server_name, backup_info.backup_id)

# Exclusive backup: issue a pg_start_Backup() command
# Issue an exclusive start backup command
_logger.debug("Start of exclusive backup")
start_row = self.executor.server.postgres.start_exclusive_backup(label)
backup_info.set_attribute('status', "STARTED")
backup_info.set_attribute('timeline',
int(start_row['file_name'][0:8], 16))
backup_info.set_attribute('begin_xlog', start_row['location'])
backup_info.set_attribute('begin_wal', start_row['file_name'])
backup_info.set_attribute('begin_offset', start_row['file_offset'])
backup_info.set_attribute('begin_time', start_row['timestamp'])
self._backup_info_from_start_backup(backup_info, start_row)

def stop_backup(self, backup_info):
"""
Expand All @@ -1128,17 +1149,9 @@ def stop_backup(self, backup_info):
"""

self.current_action = "issuing stop backup command"
_logger.debug("Stop of exclusive backup")
stop_row = self.executor.server.postgres.stop_exclusive_backup()
if stop_row:
backup_info.set_attribute('end_xlog', stop_row['location'])
backup_info.set_attribute('end_wal', stop_row['file_name'])
backup_info.set_attribute('end_offset', stop_row['file_offset'])
backup_info.set_attribute('end_time', stop_row['timestamp'])
else:
raise PostgresException(
'Cannot terminate exclusive backup. '
'You might have to manually execute pg_stop_backup() '
'on your PostgreSQL server')
self._backup_info_from_stop_backup(backup_info, stop_row)

def check(self, check_strategy):
"""
Expand Down Expand Up @@ -1195,6 +1208,23 @@ def _write_backup_label(self, backup_info):
with open(label_file, 'w') as f:
f.write(backup_info.backup_label)

def _write_tablespace_map(self, backup_info):
"""
Write tablespace_map file inside PGDATA folder
:param barman.infofile.BackupInfo backup_info: backup information
"""
map_file = os.path.join(backup_info.get_data_directory(),
'tablespace_map')
output.debug("Writing tablespace map")
with open(map_file, 'w') as f:
for tbs in backup_info.tablespaces:
# In some cases (i.e. PostgreSQL on windows) a tablespace
# can contain a newline or a line feed. PostgreSQL
# pg_basebackup code does the same.
quoted_location = re.sub(r'([\n\r])', r'\\\1', tbs.location)
f.write('%s %s\n' % (tbs.oid, quoted_location))

def start_backup(self, backup_info):
"""
Start of the backup.
Expand All @@ -1204,20 +1234,52 @@ def start_backup(self, backup_info):
:param barman.infofile.BackupInfo backup_info: backup information
"""
self.current_action = "connecting to database (%s)" % \
self.executor.config.conninfo
output.debug(self.current_action)
# with self.executor.server.pg_connect():
# Retrieve PostgreSQL server metadata
self._pg_get_metadata(backup_info)
super(ConcurrentBackupStrategy, self).start_backup(backup_info)

# Issue _pg_start_backup on the PostgreSQL server
self.current_action = "issuing start backup command"
_logger.debug(self.current_action)
label = "Barman backup %s %s" % (
backup_info.server_name, backup_info.backup_id)

# Concurrent backup: issue a pgespresso_start_Backup() command
pg_version = self.executor.server.postgres.server_version
if pg_version >= 90600:
# On 9.6+ execute native concurrent start backup
_logger.debug("Start of native concurrent backup")
self._concurrent_start_backup(backup_info, label)
else:
# On older Postgres use pgespresso
_logger.debug("Start of concurrent backup with pgespresso")
self._pgespresso_start_backup(backup_info, label)

def stop_backup(self, backup_info):
"""
Stop backup wrapper
:param barman.infofile.BackupInfo backup_info: backup information
"""
pg_version = self.executor.server.postgres.server_version
if pg_version >= 90600:
# On 9.6+ execute native concurrent stop backup
_logger.debug("Stop of native concurrent backup")
self._concurrent_stop_backup(backup_info)
else:
# On older Postgres use pgespresso
_logger.debug("Stop of concurrent backup with pgespresso")
self._pgespresso_stop_backup(backup_info)

# Write backup_label retrieved from postgres connection
self.current_action = "writing backup label"
self._write_backup_label(backup_info)

# Write the tablespaces map only if at least a tablespace is present
if backup_info.tablespaces:
self.current_action = "writing tablespace map"
self._write_tablespace_map(backup_info)

def _pgespresso_start_backup(self, backup_info, label):
"""
Start a concurrent backup using pgespresso
:param barman.infofile.BackupInfo backup_info: backup information
"""
postgres = self.executor.server.postgres
start_row = postgres.pgespresso_start_backup(label)
wal_re = re.compile(
Expand All @@ -1235,30 +1297,22 @@ def start_backup(self, backup_info):
wal_info.group(1)))
backup_info.set_attribute('begin_time', start_row['timestamp'])

def stop_backup(self, backup_info):
def _pgespresso_stop_backup(self, backup_info):
"""
Stop backup wrapper
Stop a concurrent backup using pgespresso
:param barman.infofile.BackupInfo backup_info: backup information
"""
postgres = self.executor.server.postgres
stop_row = postgres.pgespresso_stop_backup(backup_info.backup_label)
if stop_row:
decoded_segment = xlog.decode_segment_name(stop_row['end_wal'])
backup_info.set_attribute('end_xlog',
"%X/%X" % (decoded_segment[1],
(decoded_segment[
2] + 1) << 24))
backup_info.set_attribute('end_wal', stop_row['end_wal'])
backup_info.set_attribute('end_offset', 0)
backup_info.set_attribute('end_time', stop_row['timestamp'])
else:
raise PostgresException(
'Cannot terminate exclusive backup. '
'You might have to manually execute '
'pgespresso_abort_backup() on your PostgreSQL server')
self.current_action = "writing backup label"
self._write_backup_label(backup_info)
decoded_segment = xlog.decode_segment_name(stop_row['end_wal'])
backup_info.set_attribute('end_xlog',
"%X/%X" % (decoded_segment[1],
(decoded_segment[
2] + 1) << 24))
backup_info.set_attribute('end_wal', stop_row['end_wal'])
backup_info.set_attribute('end_offset', 0)
backup_info.set_attribute('end_time', stop_row['timestamp'])

def check(self, check_strategy):
"""
Expand All @@ -1267,10 +1321,38 @@ def check(self, check_strategy):
:param CheckStrategy check_strategy: the strategy for the management
of the results of the various checks
"""
if self.executor.server.postgres.has_pgespresso:
check_strategy.result(self.executor.config.name,
'pgespresso extension', True)
else:
check_strategy.result(self.executor.config.name,
'pgespresso extension', False,
'required for concurrent backups')
postgres = self.executor.server.postgres
if postgres.server_version < 90600:
if self.executor.server.postgres.has_pgespresso:
check_strategy.result(self.executor.config.name,
'pgespresso extension', True)
else:
check_strategy.result(self.executor.config.name,
'pgespresso extension', False,
'required for concurrent backups on '
'PostgreSQL %s' %
postgres.server_major_version)

def _concurrent_start_backup(self, backup_info, label):
"""
Start a concurrent backup using the PostgreSQL 9.6
concurrent backup api
:param barman.infofile.BackupInfo backup_info: backup information
:param str label: the backup label
"""
postgres = self.executor.server.postgres
start_row = postgres.start_concurrent_backup(label)
self._backup_info_from_start_backup(backup_info, start_row)

def _concurrent_stop_backup(self, backup_info):
"""
Stop a concurrent backup using the PostgreSQL 9.6
concurrent backup api
:param barman.infofile.BackupInfo backup_info: backup information
"""
postgres = self.executor.server.postgres
stop_row = postgres.stop_concurrent_backup()
self._backup_info_from_stop_backup(backup_info, stop_row)
backup_info.set_attribute('backup_label', stop_row['backup_label'])

0 comments on commit fec853b

Please sign in to comment.