Skip to content

Commit 468641b

Browse files
vietcgibarthisrael
authored andcommitted
Add LZ4 compression for cloud backups
This adds LZ4 compression support for `barman-cloud-backup` and `barman-cloud-wal-archive` commands, providing another compression alternative to existing options. Core implementation: - Add `LZ4Compressor` class with streaming compression/decompression - Add `flush()` method to `ChunkedCompressor` for proper frame termination - Update `CloudTarUploader.close()` to flush compressor before finalization - Add --lz4 CLI argument to barman-cloud-backup command - Add .lz4 file extension handling for backup tar files - Add tar.lz4 extension parsing in backup catalog This feature was originally suggested and implemented by @vietcgi in the PR #1144. Closes #1144 References: BAR-1013 Signed-off-by: Barbara Leidens <barbara.leidens@enterprisedb.com>
1 parent ee7b457 commit 468641b

3 files changed

Lines changed: 113 additions & 7 deletions

File tree

barman/clients/cloud_backup.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,13 @@ def parse_arguments(args=None):
308308
const="snappy",
309309
dest="compression",
310310
)
311+
compression.add_argument(
312+
"--lz4",
313+
help="lz4-compress the backup while uploading to the cloud",
314+
action="store_const",
315+
const="lz4",
316+
dest="compression",
317+
)
311318
parser.add_argument(
312319
"-h",
313320
"--host",

barman/clients/cloud_compression.py

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919

2020
from abc import ABCMeta, abstractmethod
2121

22-
from barman.compression import _try_import_snappy, get_internal_compressor
22+
from barman.compression import (
23+
_try_import_lz4,
24+
_try_import_snappy,
25+
get_internal_compressor,
26+
)
2327
from barman.utils import with_metaclass
2428

2529

@@ -49,6 +53,19 @@ def decompress(self, data):
4953
:rtype: bytes
5054
"""
5155

56+
def flush(self):
57+
"""
58+
Flushes any remaining compressed data and returns the final bytes.
59+
60+
This method should be called after all data has been compressed with
61+
add_chunk() to ensure any buffered data and end markers are written.
62+
The default implementation returns an empty bytes object.
63+
64+
:return: Any remaining compressed data
65+
:rtype: bytes
66+
"""
67+
return b""
68+
5269

5370
class SnappyCompressor(ChunkedCompressor):
5471
"""
@@ -82,20 +99,89 @@ def decompress(self, data):
8299
return self.decompressor.decompress(data)
83100

84101

102+
class Lz4Compressor(ChunkedCompressor):
103+
"""
104+
A ChunkedCompressor implementation based on lz4.
105+
106+
Uses lz4.frame for streaming compression and decompression. The compressor
107+
maintains state across add_chunk() calls and requires flush() to be called
108+
at the end to write the frame end marker.
109+
"""
110+
111+
def __init__(self):
112+
lz4 = _try_import_lz4()
113+
self._lz4_frame = lz4.frame
114+
self._compressor = None
115+
self._decompressor = None
116+
self._started = False
117+
self._flushed = False
118+
119+
def add_chunk(self, data):
120+
"""
121+
Compresses the supplied data and returns the compressed bytes.
122+
123+
On the first call, this initializes the lz4 frame and writes the header.
124+
Subsequent calls compress additional data within the same frame.
125+
126+
:param bytes data: The chunk of data to be compressed
127+
:return: The compressed data
128+
:rtype: bytes
129+
"""
130+
if self._compressor is None:
131+
self._compressor = self._lz4_frame.LZ4FrameCompressor(auto_flush=True)
132+
133+
if not self._started:
134+
self._started = True
135+
return self._compressor.begin() + self._compressor.compress(data)
136+
return self._compressor.compress(data)
137+
138+
def decompress(self, data):
139+
"""
140+
Decompresses the supplied chunk of data and returns the uncompressed data.
141+
142+
The LZ4FrameDecompressor handles streaming decompression and buffering
143+
of partial frames automatically.
144+
145+
:param bytes data: The chunk of data to be decompressed
146+
:return: The decompressed data
147+
:rtype: bytes
148+
"""
149+
if self._decompressor is None:
150+
self._decompressor = self._lz4_frame.LZ4FrameDecompressor()
151+
return self._decompressor.decompress(data)
152+
153+
def flush(self):
154+
"""
155+
Flushes any remaining data and returns the frame end marker.
156+
157+
This must be called after all data has been compressed to ensure the
158+
lz4 frame is properly terminated. Subsequent calls return empty bytes.
159+
160+
:return: The frame end marker bytes
161+
:rtype: bytes
162+
"""
163+
if self._compressor is not None and self._started and not self._flushed:
164+
self._flushed = True
165+
return self._compressor.flush()
166+
return b""
167+
168+
85169
def get_compressor(compression):
86170
"""
87171
Helper function which returns a ChunkedCompressor for the specified compression
88-
algorithm. Currently only snappy is supported. The other compression algorithms
172+
algorithm. Snappy and lz4 are supported. The other compression algorithms
89173
supported by barman cloud use the decompression built into TarFile.
90174
91-
:param str compression: The compression algorithm to use. Can be set to snappy
92-
or any compression supported by the TarFile mode string.
175+
:param str compression: The compression algorithm to use. Can be set to snappy,
176+
lz4, or any compression supported by the TarFile mode string.
93177
:return: A ChunkedCompressor capable of compressing and decompressing using the
94178
specified compression.
95179
:rtype: ChunkedCompressor
96180
"""
97181
if compression == "snappy":
98182
return SnappyCompressor()
183+
if compression == "lz4":
184+
return Lz4Compressor()
99185
return None
100186

101187

@@ -107,12 +193,13 @@ def get_streaming_tar_mode(mode, compression):
107193
ignored so that barman-cloud can apply them itself.
108194
109195
:param str mode: The file mode to use, either r or w.
110-
:param str compression: The compression algorithm to use. Can be set to snappy
111-
or any compression supported by the TarFile mode string.
196+
:param str compression: The compression algorithm to use. Can be set to snappy,
197+
lz4, or any compression supported by the TarFile mode string.
112198
:return: The full filemode for a streaming tar file
113199
:rtype: str
114200
"""
115-
if compression == "snappy" or compression is None:
201+
# Compression algorithms that require manual handling (not built into TarFile)
202+
if compression in ("snappy", "lz4") or compression is None:
116203
return "%s|" % mode
117204
else:
118205
return "%s|%s" % (mode, compression)

barman/cloud.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,14 @@ def flush(self):
311311
def close(self):
312312
if self.tar:
313313
self.tar.close()
314+
# Flush any remaining compressed data (e.g., lz4 frame end marker)
315+
if self.compressor:
316+
final_bytes = self.compressor.flush()
317+
if final_bytes:
318+
if not self.buffer:
319+
self.buffer = self._buffer()
320+
self.buffer.write(final_bytes)
321+
self.size += len(final_bytes)
314322
self.flush()
315323
self.cloud_interface.async_complete_multipart_upload(
316324
upload_metadata=self.upload_metadata,
@@ -401,6 +409,8 @@ def _build_dest_name(self, name, count=0):
401409
components.append(".bz2")
402410
elif self.compression == "snappy":
403411
components.append(".snappy")
412+
elif self.compression == "lz4":
413+
components.append(".lz4")
404414
return "".join(components)
405415

406416
def _get_tar(self, name):
@@ -2384,6 +2394,8 @@ def get_backup_files(self, backup_info, allow_missing=False):
23842394
info.compression = "bzip2"
23852395
elif ext == "tar.snappy":
23862396
info.compression = "snappy"
2397+
elif ext == "tar.lz4":
2398+
info.compression = "lz4"
23872399
else:
23882400
_logger.warning("Skipping unknown extension: %s", ext)
23892401
continue

0 commit comments

Comments
 (0)