diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 6e632873..bc7a5cec 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -23,7 +23,7 @@ from pathlib import Path from queue import Empty, Queue from threading import Event -from typing import Dict, List, Optional +from typing import Dict, List, NamedTuple, Optional import psycopg2 from rohmu import dates, get_transfer, rohmufile @@ -39,7 +39,7 @@ replication_connection_string_and_slot_using_pgpass, write_json_file ) from pghoard.compressor import ( - CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionQueue + CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionEvent, WalFileDeletionQueue ) from pghoard.preservation_request import ( is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation @@ -56,6 +56,14 @@ class DeltaBaseBackupFailureInfo: retries: int = 0 +class BackupSitePaths(NamedTuple): + compressed_xlog_path: str + compressed_timeline_path: str + uncompressed_files_path: str + basebackup_path: str + uncompressed_basebackup_path: str + + class InotifyAdapter: def __init__(self, queue): self.queue = queue @@ -287,24 +295,25 @@ def start_walreceiver(self, site, chosen_backup_node, last_flushed_lsn): def _get_site_prefix(self, site): return self.config["backup_sites"][site]["prefix"] - def create_backup_site_paths(self, site): + def create_backup_site_paths(self, site: str) -> BackupSitePaths: site_path = os.path.join(self.config["backup_location"], self._get_site_prefix(site)) xlog_path = os.path.join(site_path, "xlog") + timeline_path = os.path.join(site_path, "timeline") basebackup_path = os.path.join(site_path, "basebackup") - paths_to_create = [ - site_path, - xlog_path, - xlog_path + "_incoming", - basebackup_path, - basebackup_path + "_incoming", - ] + backup_site_paths = BackupSitePaths( + uncompressed_files_path=xlog_path + "_incoming", + compressed_xlog_path=xlog_path, + compressed_timeline_path=timeline_path, + basebackup_path=basebackup_path, + uncompressed_basebackup_path=basebackup_path + "_incoming", + ) - for path in paths_to_create: + for path in backup_site_paths: if not os.path.exists(path): os.makedirs(path) - return xlog_path, basebackup_path + return backup_site_paths def delete_remote_wal_before(self, wal_segment, site, pg_version): self.log.info("Starting WAL deletion from: %r before: %r, pg_version: %r", site, wal_segment, pg_version) @@ -577,12 +586,15 @@ def startup_walk_for_missed_files(self): """Check xlog and xlog_incoming directories for files that receivexlog has received but not yet compressed as well as the files we have compressed but not yet uploaded and process them.""" for site in self.config["backup_sites"]: - compressed_xlog_path, _ = self.create_backup_site_paths(site) - uncompressed_xlog_path = compressed_xlog_path + "_incoming" + backup_site_paths = self.create_backup_site_paths(site) + + compressed_xlog_path = backup_site_paths.compressed_xlog_path + compressed_timeline_path = backup_site_paths.compressed_timeline_path + uncompressed_files_path = backup_site_paths.uncompressed_files_path # Process uncompressed files (ie WAL pg_receivexlog received) - for filename in os.listdir(uncompressed_xlog_path): - full_path = os.path.join(uncompressed_xlog_path, filename) + for filename in os.listdir(uncompressed_files_path): + full_path = os.path.join(uncompressed_files_path, filename) if wal.PARTIAL_WAL_RE.match(filename): # pg_receivewal may have been in the middle of storing WAL file when PGHoard was stopped. # If the file is 0 or 16 MiB in size it will continue normally but in some cases the file can be @@ -609,6 +621,34 @@ def startup_walk_for_missed_files(self): filetype = FileType.Timeline if wal.TIMELINE_RE.match(filename) else FileType.Wal + # verify if file was already compressed, otherwise the transfer agent will encounter + # duplicated UploadEvents. In case it was compressed, we should just add it to the deletion queue + base_compressed_file_path = ( + compressed_timeline_path if filetype is FileType.Timeline else compressed_xlog_path + ) + compressed_file_path = os.path.join(base_compressed_file_path, filename) + is_already_compressed = os.path.exists(compressed_file_path) + has_metadata_file = os.path.exists(compressed_file_path + ".metadata") + + # the file was compressed correctly + if is_already_compressed and has_metadata_file: + self.log.debug("Uncompressed file %r is already compressed, adding to deletion queue.", full_path) + if filetype is FileType.Timeline: + os.unlink(full_path) + else: + delete_request = WalFileDeletionEvent(backup_site_name=site, file_path=Path(full_path)) + self.log.info("Adding an Uncompressed WAL file to deletion queue: %s", full_path) + self.wal_file_deletion_queue.put(delete_request) + continue + + # delete compressed file and re-try + if is_already_compressed and not has_metadata_file: + self.log.info( + "Deleting incomplete compressed file %r (missing metadata), compression will be re-tried", + compressed_file_path, + ) + os.unlink(compressed_file_path) + compression_event = CompressionEvent( file_type=filetype, file_path=FileTypePrefixes[filetype] / filename, @@ -622,32 +662,35 @@ def startup_walk_for_missed_files(self): self.compression_queue.put(compression_event) # Process compressed files (ie things we've processed but not yet uploaded) - for filename in os.listdir(compressed_xlog_path): - if filename.endswith(".metadata"): - continue # silently ignore .metadata files, they're expected and processed below - full_path = os.path.join(compressed_xlog_path, filename) - metadata_path = full_path + ".metadata" - is_xlog = wal.WAL_RE.match(filename) - is_timeline = wal.TIMELINE_RE.match(filename) - if not ((is_xlog or is_timeline) and os.path.exists(metadata_path)): - self.log.warning("Found invalid file %r from compressed xlog directory", full_path) - continue - with open(metadata_path, "r") as fp: - metadata = json.load(fp) - - file_type = FileType.Wal if is_xlog else FileType.Timeline - - transfer_event = UploadEvent( - file_type=file_type, - backup_site_name=site, - file_size=os.path.getsize(full_path), - file_path=FileTypePrefixes[file_type] / filename, - source_data=Path(full_path), - callback_queue=None, - metadata=metadata - ) - self.log.debug("Found: %r when starting up, adding to transfer queue", transfer_event) - self.transfer_queue.put(transfer_event) + for compressed_file_dir in [compressed_xlog_path, compressed_timeline_path]: + for filename in os.listdir(compressed_file_dir): + if filename.endswith(".metadata"): + continue # silently ignore .metadata files, they're expected and processed below + + full_path = os.path.join(compressed_file_dir, filename) + metadata_path = full_path + ".metadata" + is_xlog = wal.WAL_RE.match(filename) + is_timeline = wal.TIMELINE_RE.match(filename) + + if not ((is_xlog or is_timeline) and os.path.exists(metadata_path)): + self.log.warning("Found invalid file %r from compressed xlog directory", full_path) + continue + with open(metadata_path, "r") as fp: + metadata = json.load(fp) + + file_type = FileType.Wal if is_xlog else FileType.Timeline + + transfer_event = UploadEvent( + file_type=file_type, + backup_site_name=site, + file_size=os.path.getsize(full_path), + file_path=FileTypePrefixes[file_type] / filename, + source_data=Path(full_path), + callback_queue=None, + metadata=metadata + ) + self.log.debug("Found: %r when starting up, adding to transfer queue", transfer_event) + self.transfer_queue.put(transfer_event) def start_threads_on_startup(self): # Startup threads @@ -668,7 +711,7 @@ def _cleanup_inactive_receivexlogs(self, site): def handle_site(self, site, site_config): self.set_state_defaults(site) - xlog_path, basebackup_path = self.create_backup_site_paths(site) + backup_site_paths = self.create_backup_site_paths(site) if not site_config["active"]: return # If a site has been marked inactive, don't bother checking anything @@ -679,7 +722,7 @@ def handle_site(self, site, site_config): if site not in self.receivexlogs and site not in self.walreceivers: if site_config["active_backup_mode"] == "pg_receivexlog": - self.receivexlog_listener(site, chosen_backup_node, xlog_path + "_incoming") + self.receivexlog_listener(site, chosen_backup_node, backup_site_paths.uncompressed_files_path) elif site_config["active_backup_mode"] == "walreceiver": state_file_path = self.config["json_state_file_path"] walreceiver_state = {} @@ -745,7 +788,13 @@ def handle_site(self, site, site_config): return self.basebackups_callbacks[site] = Queue() - self.create_basebackup(site, chosen_backup_node, basebackup_path, self.basebackups_callbacks[site], metadata) + self.create_basebackup( + site=site, + connection_info=chosen_backup_node, + basebackup_path=backup_site_paths.basebackup_path, + callback_queue=self.basebackups_callbacks[site], + metadata=metadata, + ) def get_new_backup_details(self, *, now=None, site, site_config): """Returns metadata to associate with new backup that needs to be created or None in case no backup should diff --git a/test/test_compressor.py b/test/test_compressor.py index 25506e63..5482758e 100644 --- a/test/test_compressor.py +++ b/test/test_compressor.py @@ -176,6 +176,13 @@ def _test_compress_to_file(self, filetype, file_size, file_path): assert transfer_event.metadata.pop("hash-algorithm") == "sha1" assert getattr(transfer_event, key) == value + compressed_file_path = os.path.join( + self.config["backup_location"], self.config["backup_sites"][self.test_site]["prefix"], + "xlog" if filetype == "xlog" else "timeline", file_path.name + ) + # make sure the file was compressed on the expected location + assert os.path.exists(compressed_file_path) + def test_compress_to_memory(self): ifile = WALTester(self.incoming_path, "0000000100000000000000FF", "random") filetype = FileType.Wal diff --git a/test/test_pghoard.py b/test/test_pghoard.py index f3170b73..a968e8d4 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -696,26 +696,28 @@ def test_backup_state_file(self): assert empty_state == state def test_startup_walk_for_missed_compressed_files(self): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - with open(os.path.join(compressed_wal_path, "000000010000000000000004"), "wb") as fp: + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004"), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, "000000010000000000000004.metadata"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004.metadata"), "wb") as fp: fp.write(b"{}") - with open(os.path.join(compressed_wal_path, "0000000F.history"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_timeline_path, "0000000F.history"), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, "0000000F.history.metadata"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_timeline_path, "0000000F.history.metadata"), "wb") as fp: fp.write(b"{}") - with open(os.path.join(compressed_wal_path, "000000010000000000000004xyz"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004xyz"), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, "000000010000000000000004xyz.metadata"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004xyz.metadata"), "wb") as fp: fp.write(b"{}") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 0 assert self.pghoard.transfer_queue.qsize() == 2 def test_startup_walk_for_missed_uncompressed_files(self): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - uncompressed_wal_path = compressed_wal_path + "_incoming" + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + + uncompressed_wal_path = backup_site_paths.uncompressed_files_path + with open(os.path.join(uncompressed_wal_path, "000000010000000000000004"), "wb") as fp: fp.write(b"foo") with open(os.path.join(uncompressed_wal_path, "00000002.history"), "wb") as fp: @@ -730,9 +732,8 @@ def test_startup_walk_for_missed_uncompressed_files(self): "file_type, file_name", [(FileType.Wal, "000000010000000000000004"), (FileType.Timeline, "00000002.history")] ) def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileType, file_name: str): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - uncompressed_wal_path = compressed_wal_path + "_incoming" - with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp: + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + with open(os.path.join(backup_site_paths.uncompressed_files_path, file_name), "wb") as fp: fp.write(b"foo") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 1 @@ -744,10 +745,16 @@ def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileTyp "file_type, file_name", [(FileType.Wal, "000000010000000000000005"), (FileType.Timeline, "00000003.history")] ) def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, file_name: str): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - with open(os.path.join(compressed_wal_path, file_name), "wb") as fp: + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + + if file_type is FileType.Wal: + compressed_file_path = backup_site_paths.compressed_xlog_path + else: + compressed_file_path = backup_site_paths.compressed_timeline_path + + with open(os.path.join(compressed_file_path, file_name), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, f"{file_name}.metadata"), "wb") as fp: + with open(os.path.join(compressed_file_path, f"{file_name}.metadata"), "wb") as fp: fp.write(b"{}") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 0 @@ -755,6 +762,63 @@ def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, upload_event = self.pghoard.transfer_queue.get(timeout=1.0) assert upload_event.file_type == file_type + @pytest.mark.parametrize( + "file_type, file_name, invalid_compressed_file_name", [ + (FileType.Wal, "000000010000000000000005", "000000010000000000000006"), + (FileType.Timeline, "00000003.history", "00000004.history"), + ] + ) + def test_startup_walk_skip_compression_if_already_compressed( + self, + file_type: FileType, + file_name: str, + invalid_compressed_file_name: str, + ) -> None: + """ + Tests the scenario where an uncompressed file was already compressed, but was not deleted. + """ + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + uncompressed_wal_path = backup_site_paths.uncompressed_files_path + compressed_file_path = ( + backup_site_paths.compressed_timeline_path + if file_type is FileType.Timeline else backup_site_paths.compressed_xlog_path + ) + + # generate uncompressed/compressed files + with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp: + fp.write(b"random") + + with open(os.path.join(uncompressed_wal_path, invalid_compressed_file_name), "wb") as fp: + fp.write(b"random") + + # compressed + with open(os.path.join(compressed_file_path, file_name), "wb") as fp: + fp.write(b"random") + + with open(os.path.join(compressed_file_path, f"{file_name}.metadata"), "wb") as fp: + fp.write(b"{}") + + # invalid compressed file should not have a metadata + with open(os.path.join(compressed_file_path, invalid_compressed_file_name), "wb") as fp: + fp.write(b"random") + + self.pghoard.startup_walk_for_missed_files() + + # only one file should be added for compression (invalid compressed one) + assert self.pghoard.compression_queue.qsize() == 1 + compression_event = self.pghoard.compression_queue.get() + assert compression_event.file_path.name == invalid_compressed_file_name + + assert self.pghoard.transfer_queue.qsize() == 1 + upload_event = self.pghoard.transfer_queue.get() + assert upload_event.file_path.name == file_name + + if file_type is FileType.Wal: + assert self.pghoard.wal_file_deletion_queue.qsize() == 1 + else: + # uncompressed timeline files are not added to deletion queue, they are immediately unlinked + assert self.pghoard.wal_file_deletion_queue.qsize() == 0 + class TestPGHoardWithPG: def test_auth_alert_files(self, db, pghoard):