Skip to content

Commit

Permalink
Introduce PostgreSQLClient base class
Browse files Browse the repository at this point in the history
Refactor the PgBaseBackup and PgReceiveXlog classes, by moving common
code in the PostgreSQLClient base class.

Add PgBasebackup class unit tests.

Signed-off-by: Giulio Calacoci <giulio.calacoci@2ndquadrant.it>
Signed-off-by: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Signed-off-by: Gabriele Bartolini <gabriele.bartolini@2ndQuadrant.it>
  • Loading branch information
gcalacoci authored and gbartolini committed Jun 17, 2016
1 parent 74619ee commit 0a646b6
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 222 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
@@ -1,4 +1,4 @@
[settings]
known_first_party=barman
known_third_party=setuptools,argh,argcomplete,dateutil,psycopg2,mock,pytest
known_third_party=setuptools,distutils,argh,argcomplete,dateutil,psycopg2,mock,pytest
skip=.tox
72 changes: 21 additions & 51 deletions barman/backup_executor.py
Expand Up @@ -30,10 +30,11 @@
import os
import re
from abc import ABCMeta, abstractmethod

from distutils.version import LooseVersion as Version

from barman import output, utils, xlog
from barman.command_wrappers import Command, PgBasebackup, RsyncPgData
from barman import output, xlog
from barman.command_wrappers import PgBaseBackup, RsyncPgData
from barman.config import BackupOptions
from barman.exceptions import (CommandFailedException, DataTransferFailure,
FsOperationFailed, PostgresConnectionError,
Expand Down Expand Up @@ -338,28 +339,18 @@ def fetch_remote_status(self):
None)

# Test pg_basebackup existence
pg_basebackup = utils.which("pg_basebackup",
self.backup_manager.server.path)
if pg_basebackup:
version_info = PgBaseBackup.get_version_info(
self.backup_manager.server.path)
if version_info['full_path']:
remote_status["pg_basebackup_installed"] = True
remote_status["pg_basebackup_path"] = pg_basebackup
remote_status["pg_basebackup_path"] = version_info['full_path']
remote_status["pg_basebackup_version"] = (
version_info['full_version'])
pgbasebackup_version = version_info['major_version']
else:
remote_status["pg_basebackup_installed"] = False
return remote_status

# Obtain the `pg_basebackup` version
pg_basebackup = Command(pg_basebackup, path=self.config.path_prefix,
check=True)
try:
pg_basebackup("--version")
splitter_version = pg_basebackup.out.strip().split()
remote_status["pg_basebackup_version"] = splitter_version[-1]
pgbasebackup_version = Version(
utils.simplify_version(remote_status["pg_basebackup_version"]))
except CommandFailedException as e:
pgbasebackup_version = None
_logger.debug("Error invoking pg_basebackup: %s", e)

# Is bandwidth limit supported?
if remote_status['pg_basebackup_version'] is not None \
and remote_status['pg_basebackup_version'] < '9.4':
Expand Down Expand Up @@ -432,45 +423,24 @@ def postgres_backup_copy(self, backup_info):
if remote_status['pg_basebackup_bwlimit']:
bandwidth_limit = self.config.bandwidth_limit

if remote_status['pg_basebackup_version'] >= '9.3':
# If pg_basebackup version is >= 9.3 we use the connection
# string because allows the user to set all the parameters
# supported by the libpq library to create a connection
connection_string = self.server.streaming.get_connection_string(
self.config.streaming_backup_name)
pg_basebackup = PgBasebackup(
destination=backup_dest,
pg_basebackup=remote_status['pg_basebackup_path'],
conn_string=connection_string,
tbs_mapping=tbs_map,
bwlimit=bandwidth_limit,
immediate=self.config.immediate_checkpoint,
path=self.backup_manager.server.path)
else:
# 9.2 version of pg_basebackup doesn't support
# connection strings so the 'split' version of the conninfo
# option is used instead.
conn_params = self.server.streaming.conn_parameters
pg_basebackup = PgBasebackup(
destination=backup_dest,
pg_basebackup=remote_status['pg_basebackup_path'],
host=conn_params.get('host', None),
port=conn_params.get('port', None),
user=conn_params.get('user', None),
tbs_mapping=tbs_map,
bwlimit=bandwidth_limit,
immediate=self.config.immediate_checkpoint,
path=self.backup_manager.server.path)

pg_basebackup = PgBaseBackup(
connection=self.server.streaming,
destination=backup_dest,
command=remote_status['pg_basebackup_path'],
version=remote_status['pg_basebackup_version'],
app_name=self.config.streaming_backup_name,
tbs_mapping=tbs_map,
bwlimit=bandwidth_limit,
immediate=self.config.immediate_checkpoint,
path=self.backup_manager.server.path)
# Do the actual copy
try:
pg_basebackup()
except CommandFailedException as e:
msg = "data transfer failure on directory '%s'" % \
backup_info.get_data_directory()
raise DataTransferFailure.from_command_error(
'pg_basebackup', e, msg
)
'pg_basebackup', e, msg)


class SshBackupExecutor(with_metaclass(ABCMeta, BackupExecutor)):
Expand Down
229 changes: 160 additions & 69 deletions barman/command_wrappers.py
Expand Up @@ -36,6 +36,7 @@

import dateutil.parser
import dateutil.tz
from distutils.version import LooseVersion as Version

import barman.utils
from barman.exceptions import CommandFailedException, RsyncListFilesFailure
Expand Down Expand Up @@ -769,115 +770,205 @@ def __init__(self, rsync='rsync', args=None, **kwargs):
Rsync.__init__(self, rsync, args=options, **kwargs)


class PgBasebackup(Command):
class PostgreSQLClient(Command):
"""
This class is a wrapper for the pg_basebackup system command
Superclass of all the PostgreSQL client commands.
"""

def __init__(self, destination,
pg_basebackup='pg_basebackup',
conn_string=None,
host=None,
port=None,
user=None,
COMMAND = None

def __init__(self,
connection,
command,
version=None,
app_name=None,
path=None,
**kwargs):
"""
Constructor
:param PostgreSQL connection: an object representing
a database connection
:param str command: the command to use
:param Version version: the command version
:param str app_name: the application name to use for the connection
:param str path: additional path for executable retrieval
"""
Command.__init__(self, command, path=path, **kwargs)

# Check if the command is actually available in path
command_path = barman.utils.which(command, path)
if not command_path:
# Raise an error if not
raise CommandFailedException('%s not in system PATH: '
'is %s installed?' % (command,
command))

if version and version >= Version("9.3"):
# If version of the client is >= 9.3 we use the connection
# string because allows the user to use all the parameters
# supported by the libpq library to create a connection
conn_string = connection.get_connection_string(app_name)
self.args.append("--dbname=%s" % conn_string)
else:
# 9.2 version doesn't support
# connection strings so the 'split' version of the conninfo
# option is used instead.
conn_params = connection.conn_parameters
self.args.append("--host=%s" % conn_params.get('host', None))
self.args.append("--port=%s" % conn_params.get('port', None))
self.args.append("--username=%s" % conn_params.get('user', None))

self.enable_signal_forwarding(signal.SIGINT)
self.enable_signal_forwarding(signal.SIGTERM)

@classmethod
def get_version_info(cls, path=None):
"""
Return a dictionary containing all the info about
the version of the PostgreSQL client
:param str path: the PATH env
"""
if cls.COMMAND is None:
raise NotImplementedError(
"get_version_info cannot be invoked on %s" % cls.__name__)

version_info = dict.fromkeys(('full_path',
'full_version',
'major_version'),
None)

# Retrieve the path of the command
version_info['full_path'] = barman.utils.which(cls.COMMAND, path)
if version_info['full_path'] is None:
# The client is not installed or not working
return version_info

# Get the version string
command = Command(version_info['full_path'], path=path, check=True)
try:
command("--version")
except CommandFailedException as e:
_logger.debug("Error invoking %s: %s", cls.COMMAND, e)
return version_info

# Parse the full text version
full_version = command.out.strip().split()[-1]
version_info['full_version'] = Version(full_version)
# Extract the major version
version_info['major_version'] = Version(barman.utils.simplify_version(
full_version))

return version_info


class PgBaseBackup(PostgreSQLClient):
"""
Wrapper class for the pg_basebackup system command
"""

COMMAND = 'pg_basebackup'

def __init__(self,
connection,
destination,
command=COMMAND,
version=None,
app_name=None,
bwlimit=None,
tbs_mapping=None,
args=None,
path=None,
immediate=False,
check=True,
args=None,
**kwargs):
"""
Constructor
:param str conn_string: connection string
:param str host: the host to connect to
:param str port: the port used for the connection to PostgreSQL
:param str user: the user to use to connect to PostgreSQL
:param str pg_basebackup: command to run
:param PostgreSQL connection: an object representing
a database connection
:param str destination: destination directory path
:param str command: the command to use
:param Version version: the command version
:param str app_name: the application name to use for the connection
:param str bwlimit: bandwidth limit for pg_basebackup
:param Dict[str, str] tbs_mapping: used for tablespace
:param bool immediate: fast checkpoint identifier for pg_basebackup
:param str path: additional path for executable retrieval
:param bool check: check if the return value is in the list of
allowed values of the Command obj
:param List[str] args: additional arguments
:param Dict[str, str] tbs_mapping: used for tablespace
:param str destination: destination directory
relocation
"""
# Check if pg_basebackup is actually available
pg_basebackup_path = barman.utils.which(pg_basebackup, path)
if not pg_basebackup_path:
raise CommandFailedException('pg_basebackup not in system PATH: '
'is pg_basebackup installed?')
PostgreSQLClient.__init__(
self,
connection=connection, command=command,
version=version, app_name=app_name,
check=check, **kwargs)

# Set the backup destination
options = ['-v', '--pgdata=%s' % destination]
self.args += ['-v', '--pgdata=%s' % destination]

# The tablespace mapping option is repeated once for each tablespace
if tbs_mapping:
for (tbs_source, tbs_destination) in tbs_mapping.items():
options.append('--tablespace-mapping=%s=%s' %
(tbs_source, tbs_destination))

# Pass the connections parameters
if conn_string:
options.append("--dbname=%s" % conn_string)
if host:
options.append("--host=%s" % host)
if port:
options.append("--port=%s" % port)
if host:
options.append("--username=%s" % user)
self.args.append('--tablespace-mapping=%s=%s' %
(tbs_source, tbs_destination))

# Only global bandwidth limit is supported
if bwlimit is not None and bwlimit > 0:
options.append("--max-rate=%s" % bwlimit)
self.args.append("--max-rate=%s" % bwlimit)

# Immediate checkpoint
if immediate:
options.append("--checkpoint=fast")
self.args.append("--checkpoint=fast")

# Add other arguments
# Manage additional args
if args:
options += args

Command.__init__(self, pg_basebackup, args=options, check=True,
path=path, **kwargs)
self.args += args


class PgReceiveXlog(Command):
class PgReceiveXlog(PostgreSQLClient):
"""
Wrapper class for pg_receivexlog
"""

COMMAND = "pg_receivexlog"

def __init__(self,
receivexlog='pg_receivexlog',
conn_string=None,
dest=None,
args=None,
connection,
destination,
command=COMMAND,
version=None,
app_name=None,
check=True,
host=None,
port=None,
user=None,
args=None,
**kwargs):
options = [
"""
Constructor
:param PostgreSQL connection: an object representing
a database connection
:param str destination: destination directory path
:param str command: the command to use
:param Version version: the command version
:param str app_name: the application name to use for the connection
:param bool check: check if the return value is in the list of
allowed values of the Command obj
:param List[str] args: additional arguments
"""
PostgreSQLClient.__init__(
self,
connection=connection, command=command,
version=version, app_name=app_name,
check=check, **kwargs)

self.args += [
"--verbose",
"--no-loop",
"--directory=%s" % dest]
# Pass the connections parameters
if conn_string:
options.append("--dbname=%s" % conn_string)
if host:
options.append("--host=%s" % host)
if port:
options.append("--port=%s" % port)
if host:
options.append("--username=%s" % user)
# Add eventual other arguments
"--directory=%s" % destination]

# Manage additional args
if args:
options += args
Command.__init__(self, receivexlog, args=options, check=check,
**kwargs)
self.enable_signal_forwarding(signal.SIGINT)
self.enable_signal_forwarding(signal.SIGTERM)
self.args += args


class BarmanSubProcess(object):
Expand Down

0 comments on commit 0a646b6

Please sign in to comment.