From 0e8ed9787e380040b5c4929362618cff2b36f8be Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Mon, 3 Jul 2023 13:44:12 +0200 Subject: [PATCH 1/5] verify uncompressed file does not exists under /xlog and /timeline (is already compressed) [BF-1797] --- pghoard/pghoard.py | 71 +++++++++++++++++++++++--------- test/test_compressor.py | 7 ++++ test/test_pghoard.py | 90 ++++++++++++++++++++++++++++++++++------- 3 files changed, 134 insertions(+), 34 deletions(-) diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 6e632873..3d451b9d 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -23,7 +23,7 @@ from pathlib import Path from queue import Empty, Queue from threading import Event -from typing import Dict, List, Optional +from typing import Dict, List, NamedTuple, Optional import psycopg2 from rohmu import dates, get_transfer, rohmufile @@ -39,7 +39,7 @@ replication_connection_string_and_slot_using_pgpass, write_json_file ) from pghoard.compressor import ( - CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionQueue + CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionEvent, WalFileDeletionQueue ) from pghoard.preservation_request import ( is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation @@ -56,6 +56,14 @@ class DeltaBaseBackupFailureInfo: retries: int = 0 +class BackupSitePaths(NamedTuple): + compressed_xlog_path: str + compressed_timeline_path: str + uncompressed_files_path: str + basebackup_path: str + uncompressed_basebackup_path: str + + class InotifyAdapter: def __init__(self, queue): self.queue = queue @@ -287,24 +295,25 @@ def start_walreceiver(self, site, chosen_backup_node, last_flushed_lsn): def _get_site_prefix(self, site): return self.config["backup_sites"][site]["prefix"] - def create_backup_site_paths(self, site): + def create_backup_site_paths(self, site: str) -> BackupSitePaths: site_path = os.path.join(self.config["backup_location"], self._get_site_prefix(site)) xlog_path = os.path.join(site_path, "xlog") + timeline_path = os.path.join(site_path, "timeline") basebackup_path = os.path.join(site_path, "basebackup") - paths_to_create = [ - site_path, - xlog_path, - xlog_path + "_incoming", - basebackup_path, - basebackup_path + "_incoming", - ] + backup_site_paths = BackupSitePaths( + uncompressed_files_path=xlog_path + "_incoming", + compressed_xlog_path=xlog_path, + compressed_timeline_path=timeline_path, + basebackup_path=basebackup_path, + uncompressed_basebackup_path=basebackup_path + "_incoming", + ) - for path in paths_to_create: + for path in backup_site_paths: if not os.path.exists(path): os.makedirs(path) - return xlog_path, basebackup_path + return backup_site_paths def delete_remote_wal_before(self, wal_segment, site, pg_version): self.log.info("Starting WAL deletion from: %r before: %r, pg_version: %r", site, wal_segment, pg_version) @@ -577,12 +586,15 @@ def startup_walk_for_missed_files(self): """Check xlog and xlog_incoming directories for files that receivexlog has received but not yet compressed as well as the files we have compressed but not yet uploaded and process them.""" for site in self.config["backup_sites"]: - compressed_xlog_path, _ = self.create_backup_site_paths(site) - uncompressed_xlog_path = compressed_xlog_path + "_incoming" + backup_site_paths = self.create_backup_site_paths(site) + + compressed_xlog_path = backup_site_paths.compressed_xlog_path + compressed_timeline_path = backup_site_paths.compressed_timeline_path + uncompressed_files_path = backup_site_paths.uncompressed_files_path # Process uncompressed files (ie WAL pg_receivexlog received) - for filename in os.listdir(uncompressed_xlog_path): - full_path = os.path.join(uncompressed_xlog_path, filename) + for filename in os.listdir(uncompressed_files_path): + full_path = os.path.join(uncompressed_files_path, filename) if wal.PARTIAL_WAL_RE.match(filename): # pg_receivewal may have been in the middle of storing WAL file when PGHoard was stopped. # If the file is 0 or 16 MiB in size it will continue normally but in some cases the file can be @@ -609,6 +621,21 @@ def startup_walk_for_missed_files(self): filetype = FileType.Timeline if wal.TIMELINE_RE.match(filename) else FileType.Wal + # verify if file was already compressed, otherwise the transfer agent will encounter + # duplicated UploadEvents. In case it was compressed, we should just add it to the deletion queue + base_compressed_file_path = ( + compressed_timeline_path if filetype is FileType.Timeline else compressed_xlog_path + ) + if os.path.isfile(os.path.join(base_compressed_file_path, filename)): + self.log.debug("Uncompressed file %r is already compressed, adding to deletion queue.", full_path) + if filetype is FileType.Timeline: + os.unlink(full_path) + else: + delete_request = WalFileDeletionEvent(backup_site_name=site, file_path=Path(full_path)) + self.wal_file_deletion_queue.put(delete_request) + self.log.info("Adding to Uncompressed WAL file to deletion queue: %s", full_path) + continue + compression_event = CompressionEvent( file_type=filetype, file_path=FileTypePrefixes[filetype] / filename, @@ -668,7 +695,7 @@ def _cleanup_inactive_receivexlogs(self, site): def handle_site(self, site, site_config): self.set_state_defaults(site) - xlog_path, basebackup_path = self.create_backup_site_paths(site) + backup_site_paths = self.create_backup_site_paths(site) if not site_config["active"]: return # If a site has been marked inactive, don't bother checking anything @@ -679,7 +706,7 @@ def handle_site(self, site, site_config): if site not in self.receivexlogs and site not in self.walreceivers: if site_config["active_backup_mode"] == "pg_receivexlog": - self.receivexlog_listener(site, chosen_backup_node, xlog_path + "_incoming") + self.receivexlog_listener(site, chosen_backup_node, backup_site_paths.uncompressed_files_path) elif site_config["active_backup_mode"] == "walreceiver": state_file_path = self.config["json_state_file_path"] walreceiver_state = {} @@ -745,7 +772,13 @@ def handle_site(self, site, site_config): return self.basebackups_callbacks[site] = Queue() - self.create_basebackup(site, chosen_backup_node, basebackup_path, self.basebackups_callbacks[site], metadata) + self.create_basebackup( + site=site, + connection_info=chosen_backup_node, + basebackup_path=backup_site_paths.basebackup_path, + callback_queue=self.basebackups_callbacks[site], + metadata=metadata, + ) def get_new_backup_details(self, *, now=None, site, site_config): """Returns metadata to associate with new backup that needs to be created or None in case no backup should diff --git a/test/test_compressor.py b/test/test_compressor.py index 25506e63..5482758e 100644 --- a/test/test_compressor.py +++ b/test/test_compressor.py @@ -176,6 +176,13 @@ def _test_compress_to_file(self, filetype, file_size, file_path): assert transfer_event.metadata.pop("hash-algorithm") == "sha1" assert getattr(transfer_event, key) == value + compressed_file_path = os.path.join( + self.config["backup_location"], self.config["backup_sites"][self.test_site]["prefix"], + "xlog" if filetype == "xlog" else "timeline", file_path.name + ) + # make sure the file was compressed on the expected location + assert os.path.exists(compressed_file_path) + def test_compress_to_memory(self): ifile = WALTester(self.incoming_path, "0000000100000000000000FF", "random") filetype = FileType.Wal diff --git a/test/test_pghoard.py b/test/test_pghoard.py index f3170b73..1be29ff9 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -696,26 +696,28 @@ def test_backup_state_file(self): assert empty_state == state def test_startup_walk_for_missed_compressed_files(self): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - with open(os.path.join(compressed_wal_path, "000000010000000000000004"), "wb") as fp: + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004"), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, "000000010000000000000004.metadata"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004.metadata"), "wb") as fp: fp.write(b"{}") - with open(os.path.join(compressed_wal_path, "0000000F.history"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_timeline_path, "0000000F.history"), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, "0000000F.history.metadata"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_timeline_path, "0000000F.history.metadata"), "wb") as fp: fp.write(b"{}") - with open(os.path.join(compressed_wal_path, "000000010000000000000004xyz"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004xyz"), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, "000000010000000000000004xyz.metadata"), "wb") as fp: + with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004xyz.metadata"), "wb") as fp: fp.write(b"{}") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 0 assert self.pghoard.transfer_queue.qsize() == 2 def test_startup_walk_for_missed_uncompressed_files(self): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - uncompressed_wal_path = compressed_wal_path + "_incoming" + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + + uncompressed_wal_path = backup_site_paths.uncompressed_files_path + with open(os.path.join(uncompressed_wal_path, "000000010000000000000004"), "wb") as fp: fp.write(b"foo") with open(os.path.join(uncompressed_wal_path, "00000002.history"), "wb") as fp: @@ -730,9 +732,8 @@ def test_startup_walk_for_missed_uncompressed_files(self): "file_type, file_name", [(FileType.Wal, "000000010000000000000004"), (FileType.Timeline, "00000002.history")] ) def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileType, file_name: str): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - uncompressed_wal_path = compressed_wal_path + "_incoming" - with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp: + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + with open(os.path.join(backup_site_paths.uncompressed_files_path, file_name), "wb") as fp: fp.write(b"foo") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 1 @@ -744,10 +745,16 @@ def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileTyp "file_type, file_name", [(FileType.Wal, "000000010000000000000005"), (FileType.Timeline, "00000003.history")] ) def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, file_name: str): - compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - with open(os.path.join(compressed_wal_path, file_name), "wb") as fp: + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + + if file_type is FileType.Wal: + compressed_file_path = backup_site_paths.compressed_xlog_path + else: + compressed_file_path = backup_site_paths.compressed_timeline_path + + with open(os.path.join(compressed_file_path, file_name), "wb") as fp: fp.write(b"foo") - with open(os.path.join(compressed_wal_path, f"{file_name}.metadata"), "wb") as fp: + with open(os.path.join(compressed_file_path, f"{file_name}.metadata"), "wb") as fp: fp.write(b"{}") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 0 @@ -755,6 +762,59 @@ def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, upload_event = self.pghoard.transfer_queue.get(timeout=1.0) assert upload_event.file_type == file_type + @pytest.mark.parametrize( + "file_type, file_name, invalid_compressed_file_name", [ + (FileType.Wal, "000000010000000000000005", "000000010000000000000006"), + (FileType.Timeline, "00000003.history", "00000004.history"), + ] + ) + def test_startup_walk_skip_compression_if_already_compressed( + self, + file_type: FileType, + file_name: str, + invalid_compressed_file_name: str, + ) -> None: + """ + Tests the scenario where an uncompressed file was already compressed, but was not deleted. + """ + backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site) + uncompressed_wal_path = backup_site_paths.uncompressed_files_path + compressed_file_path = ( + backup_site_paths.compressed_timeline_path + if file_type is FileType.Timeline else backup_site_paths.compressed_xlog_path + ) + + # generate uncompressed/compressed files + with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp: + fp.write(b"random") + + with open(os.path.join(uncompressed_wal_path, invalid_compressed_file_name), "wb") as fp: + fp.write(b"random") + + # compressed + with open(os.path.join(compressed_file_path, file_name), "wb") as fp: + fp.write(b"random") + + with open(os.path.join(compressed_file_path, f"{file_name}.metadata"), "wb") as fp: + fp.write(b"{}") + + # invalid compressed file should not have a metadata + with open(os.path.join(compressed_file_path, invalid_compressed_file_name), "wb") as fp: + fp.write(b"random") + + self.pghoard.startup_walk_for_missed_files() + + # only one file should be added for compression (invalid compressed one) + assert self.pghoard.compression_queue.qsize() == 1 + + assert self.pghoard.transfer_queue.qsize() == 1 + + if file_type is FileType.Wal: + assert self.pghoard.wal_file_deletion_queue.qsize() == 1 + else: + # uncompressed timeline files are not added to deletion queue, they are immediately unlinked + assert self.pghoard.wal_file_deletion_queue.qsize() == 0 + class TestPGHoardWithPG: def test_auth_alert_files(self, db, pghoard): From 87a40200e0aac491010a0ed067bf03130fa43d07 Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Tue, 4 Jul 2023 10:25:34 +0200 Subject: [PATCH 2/5] upload compressed timelines from /timeline during startup walk --- pghoard/pghoard.py | 55 ++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 3d451b9d..eb276871 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -649,32 +649,35 @@ def startup_walk_for_missed_files(self): self.compression_queue.put(compression_event) # Process compressed files (ie things we've processed but not yet uploaded) - for filename in os.listdir(compressed_xlog_path): - if filename.endswith(".metadata"): - continue # silently ignore .metadata files, they're expected and processed below - full_path = os.path.join(compressed_xlog_path, filename) - metadata_path = full_path + ".metadata" - is_xlog = wal.WAL_RE.match(filename) - is_timeline = wal.TIMELINE_RE.match(filename) - if not ((is_xlog or is_timeline) and os.path.exists(metadata_path)): - self.log.warning("Found invalid file %r from compressed xlog directory", full_path) - continue - with open(metadata_path, "r") as fp: - metadata = json.load(fp) - - file_type = FileType.Wal if is_xlog else FileType.Timeline - - transfer_event = UploadEvent( - file_type=file_type, - backup_site_name=site, - file_size=os.path.getsize(full_path), - file_path=FileTypePrefixes[file_type] / filename, - source_data=Path(full_path), - callback_queue=None, - metadata=metadata - ) - self.log.debug("Found: %r when starting up, adding to transfer queue", transfer_event) - self.transfer_queue.put(transfer_event) + for compressed_file_dir in [compressed_xlog_path, compressed_timeline_path]: + for filename in os.listdir(compressed_file_dir): + if filename.endswith(".metadata"): + continue # silently ignore .metadata files, they're expected and processed below + + full_path = os.path.join(compressed_file_dir, filename) + metadata_path = full_path + ".metadata" + is_xlog = wal.WAL_RE.match(filename) + is_timeline = wal.TIMELINE_RE.match(filename) + + if not ((is_xlog or is_timeline) and os.path.exists(metadata_path)): + self.log.warning("Found invalid file %r from compressed xlog directory", full_path) + continue + with open(metadata_path, "r") as fp: + metadata = json.load(fp) + + file_type = FileType.Wal if is_xlog else FileType.Timeline + + transfer_event = UploadEvent( + file_type=file_type, + backup_site_name=site, + file_size=os.path.getsize(full_path), + file_path=FileTypePrefixes[file_type] / filename, + source_data=Path(full_path), + callback_queue=None, + metadata=metadata + ) + self.log.debug("Found: %r when starting up, adding to transfer queue", transfer_event) + self.transfer_queue.put(transfer_event) def start_threads_on_startup(self): # Startup threads From 3815837816624e39af291189279a92795a53517f Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Tue, 4 Jul 2023 10:28:26 +0200 Subject: [PATCH 3/5] re-compress in case the compressed file has no metadata --- pghoard/pghoard.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index eb276871..784d225f 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -626,7 +626,12 @@ def startup_walk_for_missed_files(self): base_compressed_file_path = ( compressed_timeline_path if filetype is FileType.Timeline else compressed_xlog_path ) - if os.path.isfile(os.path.join(base_compressed_file_path, filename)): + compressed_file_path = os.path.join(base_compressed_file_path, filename) + is_already_compressed = os.path.exists(compressed_file_path) + has_metadata_file = os.path.exists(compressed_file_path + ".metadata") + + # the file was compressed correctly + if is_already_compressed and has_metadata_file: self.log.debug("Uncompressed file %r is already compressed, adding to deletion queue.", full_path) if filetype is FileType.Timeline: os.unlink(full_path) @@ -636,6 +641,13 @@ def startup_walk_for_missed_files(self): self.log.info("Adding to Uncompressed WAL file to deletion queue: %s", full_path) continue + # delete compressed file and re-try + if is_already_compressed and not has_metadata_file: + self.log.info( + "Deleting invalid compressed file %r, compression will be re-tried", compressed_file_path, + ) + os.unlink(compressed_file_path) + compression_event = CompressionEvent( file_type=filetype, file_path=FileTypePrefixes[filetype] / filename, From db82d1a19fa8c2c79796942e52a4c36156375a35 Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Tue, 4 Jul 2023 09:07:19 +0000 Subject: [PATCH 4/5] improved log messages --- pghoard/pghoard.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 784d225f..bc7a5cec 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -637,14 +637,15 @@ def startup_walk_for_missed_files(self): os.unlink(full_path) else: delete_request = WalFileDeletionEvent(backup_site_name=site, file_path=Path(full_path)) + self.log.info("Adding an Uncompressed WAL file to deletion queue: %s", full_path) self.wal_file_deletion_queue.put(delete_request) - self.log.info("Adding to Uncompressed WAL file to deletion queue: %s", full_path) continue # delete compressed file and re-try if is_already_compressed and not has_metadata_file: self.log.info( - "Deleting invalid compressed file %r, compression will be re-tried", compressed_file_path, + "Deleting incomplete compressed file %r (missing metadata), compression will be re-tried", + compressed_file_path, ) os.unlink(compressed_file_path) From 60d6bf64a5801efe430f5b722bb53c9e8e330e7f Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Tue, 4 Jul 2023 11:39:44 +0200 Subject: [PATCH 5/5] verify correct files are compressed and uploaded in test_startup_walk_skip_compression_if_already_compressed --- test/test_pghoard.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 1be29ff9..a968e8d4 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -806,8 +806,12 @@ def test_startup_walk_skip_compression_if_already_compressed( # only one file should be added for compression (invalid compressed one) assert self.pghoard.compression_queue.qsize() == 1 + compression_event = self.pghoard.compression_queue.get() + assert compression_event.file_path.name == invalid_compressed_file_name assert self.pghoard.transfer_queue.qsize() == 1 + upload_event = self.pghoard.transfer_queue.get() + assert upload_event.file_path.name == file_name if file_type is FileType.Wal: assert self.pghoard.wal_file_deletion_queue.qsize() == 1