Skip to content

Commit

Permalink
walreceiver: Add experimental support for replicating WALs via PG rep…
Browse files Browse the repository at this point in the history
…lication protocol

You can now use "active_backup_mode": "walreceiver" in case you have a psycopg2
that supports the PostgreSQL replication protocol. At this time the support hasn't
been merged into psycopg2 master but can be found from:
psycopg/psycopg2#322

You can now use "active_backup_mode": "walreceiver" in case you have a psycopg2
that supports the PostgreSQL replication protocol. At this time the support hasn't
been merged into psycopg2 master but can be found from:

    psycopg/psycopg2#322

The backup method "walreceiver" writes no extra WAL data to disk on the machine
taking the backup.

Before this when using pg_receivexlog mode pghoard first wrote the files uncompressed
on disk once and then compressed them and wrote them on disk again causing roughly
1.5x the size of WAL writes just so pghoard could back the files up.

We now also write the last flush_lsn position into pghoard state file and read
it back from there when creating a walreceiver so we can continue from the last
known position. Note that this will fail in case pghoard was shutdown uncleanly.
(kill -9 or such)
  • Loading branch information
Ormod committed Jul 14, 2016
1 parent bdf96ac commit a1f1e95
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 11 deletions.
3 changes: 3 additions & 0 deletions NEWS
Expand Up @@ -29,6 +29,9 @@ pghoard X.X.X (2016-XX-XX)
* Make sure object storage modules are importable when reading configuration
instead of only checking it when they are used
* Improved PostgreSQL 9.2, 9.6 beta and Python 3.3 compatibility
* Experimental support for PostgreSQL replication protocol. Much faster and
less resource consuming WAL replication than when using
active_backup_mode: pg_receivexlog.

pghoard 1.3.0 (2016-05-30)
==========================
Expand Down
15 changes: 10 additions & 5 deletions README.rst
Expand Up @@ -9,8 +9,8 @@ PGHoard |BuildStatus|_
Features:

* Automatic periodic basebackups
* Automatic transaction log (WAL/xlog) backups (using either ``pg_receivexlog``
or ``archive_command``)
* Automatic transaction log (WAL/xlog) backups (using either ``pg_receivexlog``,
``archive_command`` or PG native replication protocol with ``walreceiver``)
* Cloud object storage support (AWS S3, Google Cloud, OpenStack Swift, Azure, Ceph)
* Backup restoration directly from object storage, compressed and encrypted
* Point-in-time-recovery (PITR)
Expand Down Expand Up @@ -44,7 +44,8 @@ PGHoard supports multiple operating models. The basic mode where you have a
separate backup machine, ``pghoard`` can simply connect with
``pg_receivexlog`` to receive WAL files from the database as they're
written. Another model is to use ``pghoard_postgres_command`` as a
PostgreSQL ``archive_command``.
PostgreSQL ``archive_command``. There is also experimental support for PGHoard to
use PostgreSQL's native replication protocol with the ``walreceiver`` mode.

With both modes of operations PGHoard creates periodic basebackups using
``pg_basebackup`` that is run against the database in question.
Expand Down Expand Up @@ -179,7 +180,7 @@ installation.
0. Make sure PostgreSQL is configured to allow WAL archival and retrieval.
``postgresql.conf`` should have ``wal_level`` set to ``archive`` or
higher and ``max_wal_senders`` set to at least ``1`` (``archive_command`` mode)
or at least ``2`` (``pg_receivexlog`` mode), for example::
or at least ``2`` (``pg_receivexlog`` and ``walreceiver`` modes), for example::

wal_level = archive
max_wal_senders = 4
Expand Down Expand Up @@ -354,7 +355,11 @@ of new backups and to stop the deletion of old ones.
Can be either ``pg_receivexlog`` or ``archive_command``. If set to
``pg_receivexlog``, ``pghoard`` will start up a ``pg_receivexlog`` process to be
run against the database server. If ``archive_command`` is set, we rely on the
user setting the correct ``archive_command`` in ``postgresql.conf``.
user setting the correct ``archive_command`` in
``postgresql.conf``. You can also set this to the experimental ``walreceiver`` mode
whereby pghoard will start communicating directly with PostgreSQL
through the replication protocol. (Note requires an unreleased version
of psycopg2 library)

``alert_file_dir`` (default ``os.getcwd()``)

Expand Down
12 changes: 11 additions & 1 deletion pghoard/common.py
Expand Up @@ -5,8 +5,8 @@
See LICENSE for details
"""
from pghoard import pgutil
from pghoard.rohmu.compat import suppress
from pghoard.rohmu.errors import Error, InvalidConfigurationError
import contextlib
import datetime
import fcntl
import json
Expand All @@ -16,6 +16,16 @@
import tempfile
import time

try:
from contextlib import suppress
except ImportError:
# For pre Python3.4
@contextlib.contextmanager
def suppress(*exceptions):
try:
yield
except exceptions:
pass

LOG = logging.getLogger("pghoard.common")

Expand Down
10 changes: 8 additions & 2 deletions pghoard/compressor.py
Expand Up @@ -114,7 +114,10 @@ def handle_decompression_event(self, event):
def handle_event(self, event, filetype):
# pylint: disable=redefined-variable-type
rsa_public_key = None
site = event.get("site", self.find_site_for_file(event["full_path"]))
site = event.get("site")
if not site:
site = self.find_site_for_file(event["full_path"])

encryption_key_id = self.config["backup_sites"][site]["encryption_key_id"]
if encryption_key_id:
rsa_public_key = self.config["backup_sites"][site]["encryption_keys"][encryption_key_id]["public"]
Expand All @@ -127,7 +130,10 @@ def handle_event(self, event, filetype):
compressed_filepath = self.get_compressed_file_path(site, filetype, event["full_path"])
output_obj = NamedTemporaryFile(prefix=compressed_filepath, suffix=".tmp-compress")

with output_obj, open(event["full_path"], "rb") as input_obj:
input_obj = event.get("input_data")
if not input_obj:
input_obj = open(event["full_path"], "rb")
with output_obj, input_obj:
if filetype == "xlog":
wal.verify_wal(wal_name=os.path.basename(event["full_path"]), fileobj=input_obj)

Expand Down
54 changes: 52 additions & 2 deletions pghoard/pghoard.py
@@ -1,3 +1,4 @@

"""
pghoard - main pghoard daemon
Expand All @@ -11,6 +12,7 @@
create_alert_file,
get_object_storage_config,
replication_connection_string_and_slot_using_pgpass,
suppress,
write_json_file,
)
from pghoard.compressor import CompressorThread
Expand All @@ -35,6 +37,12 @@
import sys
import time

# Imported this way because WALReceiver requires an unreleased version of psycopg2
try:
from pghoard.walreceiver import WALReceiver
except ImportError:
WALReceiver = None


class PGHoard:
def __init__(self, config_path):
Expand Down Expand Up @@ -66,6 +74,7 @@ def __init__(self, config_path):
self.basebackups_callbacks = {}
self.receivexlogs = {}
self.compressors = []
self.walreceivers = {}
self.transfer_agents = []
self.requested_basebackup_sites = set()

Expand Down Expand Up @@ -172,6 +181,27 @@ def receivexlog_listener(self, site, connection_info, xlog_directory):
thread.start()
self.receivexlogs[site] = thread

def start_walreceiver(self, site, chosen_backup_node, last_flushed_lsn):
connection_string, slot = replication_connection_string_and_slot_using_pgpass(chosen_backup_node)
pg_version_server = self.check_pg_server_version(connection_string)
if pg_version_server:
self.config["backup_sites"][site]["pg_version"] = pg_version_server
if not WALReceiver:
self.log.error("Could not import WALReceiver, incorrect psycopg2 version?")
return

thread = WALReceiver(
config=self.config,
connection_string=connection_string,
compression_queue=self.compression_queue,
replication_slot=slot,
pg_version_server=pg_version_server,
site=site,
last_flushed_lsn=last_flushed_lsn,
stats=self.stats)
thread.start()
self.walreceivers[site] = thread

def create_backup_site_paths(self, site):
site_path = os.path.join(self.config["backup_location"], self.config["path_prefix"], site)
xlog_path = os.path.join(site_path, "xlog")
Expand Down Expand Up @@ -352,8 +382,20 @@ def handle_site(self, site, site_config):

chosen_backup_node = random.choice(site_config["nodes"])

if site not in self.receivexlogs and site_config["active_backup_mode"] == "pg_receivexlog":
self.receivexlog_listener(site, chosen_backup_node, xlog_path + "_incoming")
if site not in self.receivexlogs and site not in self.walreceivers:
if site_config["active_backup_mode"] == "pg_receivexlog":
self.receivexlog_listener(site, chosen_backup_node, xlog_path + "_incoming")
elif site_config["active_backup_mode"] == "walreceiver":
state_file_path = self.config["json_state_file_path"]
walreceiver_state = {}
with suppress(FileNotFoundError):
with open(state_file_path, "r") as fp:
old_state_file = json.load(fp)
walreceiver_state = old_state_file.get("walreceivers", {}).get(site, {})
self.start_walreceiver(
site=site,
chosen_backup_node=chosen_backup_node,
last_flushed_lsn=walreceiver_state.get("last_flushed_lsn"))

if site not in self.time_of_last_backup_check or \
time.monotonic() - self.time_of_last_backup_check[site] > 300:
Expand Down Expand Up @@ -416,6 +458,11 @@ def write_backup_state_to_json_file(self):
"""Periodically write a JSON state file to disk"""
start_time = time.time()
state_file_path = self.config["json_state_file_path"]
self.state["walreceivers"] = {
key: {"latest_activity": value.latest_activity, "running": value.running,
"last_flushed_lsn": value.last_flushed_lsn}
for key, value in self.walreceivers.items()
}
self.state["pg_receivexlogs"] = {
key: {"latest_activity": value.latest_activity, "running": value.running}
for key, value in self.receivexlogs.items()
Expand Down Expand Up @@ -477,10 +524,13 @@ def quit(self, _signal=None, _frame=None): # pylint: disable=unused-argument
all_threads = [self.webserver]
all_threads.extend(self.basebackups.values())
all_threads.extend(self.receivexlogs.values())
all_threads.extend(self.walreceivers.values())
all_threads.extend(self.compressors)
all_threads.extend(self.transfer_agents)
for t in all_threads:
t.running = False
# Write state file in the end so we get the last known state
self.write_backup_state_to_json_file()
for t in all_threads:
if t.is_alive():
t.join()
Expand Down
15 changes: 15 additions & 0 deletions pghoard/wal.py
Expand Up @@ -72,6 +72,21 @@ def name_for_tli_log_seg(tli, log, seg):
return "{:08X}{:08X}{:08X}".format(tli, log, seg)


def convert_integer_to_lsn(value):
log = value >> 32
pos = value & 0xFFFFFFFF
seg = pos // XLOG_SEG_SIZE
return log, pos, seg


def get_lsn_from_start_of_wal_file(lsn):
log_hex, seg_hex = lsn.split("/", 1)
log = int(log_hex, 16)
seg = int(seg_hex, 16) >> 24
pos = seg * XLOG_SEG_SIZE
return "{:X}/{:X}".format(log, pos)


def lsn_from_name(name):
_, log, seg = name_to_tli_log_seg(name)
pos = seg * XLOG_SEG_SIZE
Expand Down

0 comments on commit a1f1e95

Please sign in to comment.