-
-
Notifications
You must be signed in to change notification settings - Fork 729
/
archive.py
1935 lines (1747 loc) · 84.7 KB
/
archive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import errno
import json
import os
import socket
import stat
import sys
import time
from collections import OrderedDict
from contextlib import contextmanager
from datetime import datetime, timezone, timedelta
from functools import partial
from getpass import getuser
from io import BytesIO
from itertools import groupby, zip_longest
from shutil import get_terminal_size
import msgpack
from .logger import create_logger
logger = create_logger()
from . import xattr
from .chunker import Chunker
from .cache import ChunkListEntry
from .crypto.key import key_factory
from .compress import Compressor, CompressionSpec
from .constants import * # NOQA
from .crypto.low_level import IntegrityError as IntegrityErrorBase
from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
from .helpers import Manifest
from .helpers import hardlinkable
from .helpers import ChunkIteratorFileWrapper, open_item
from .helpers import Error, IntegrityError, set_ec
from .helpers import uid2user, user2uid, gid2group, group2gid
from .helpers import parse_timestamp, to_localtime
from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
from .helpers import StableDict
from .helpers import bin_to_hex
from .helpers import safe_ns
from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
from .item import Item, ArchiveItem, ItemDiff
from .platform import acl_get, acl_set, set_flags, get_flags, swidth
from .remote import cache_if_remote
from .repository import Repository, LIST_SCAN_LIMIT
has_lchmod = hasattr(os, 'lchmod')
flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
class Statistics:
def __init__(self, output_json=False):
self.output_json = output_json
self.osize = self.csize = self.usize = self.nfiles = 0
self.last_progress = 0 # timestamp when last progress was shown
def update(self, size, csize, unique):
self.osize += size
self.csize += csize
if unique:
self.usize += csize
def __add__(self, other):
if not isinstance(other, Statistics):
raise TypeError('can only add Statistics objects')
stats = Statistics(self.output_json)
stats.osize = self.osize + other.osize
stats.csize = self.csize + other.csize
stats.usize = self.usize + other.usize
stats.nfiles = self.nfiles + other.nfiles
return stats
summary = "{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"
def __str__(self):
return self.summary.format(stats=self, label='This archive:')
def __repr__(self):
return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
cls=type(self).__name__, hash=id(self), self=self)
def as_dict(self):
return {
'original_size': FileSize(self.osize),
'compressed_size': FileSize(self.csize),
'deduplicated_size': FileSize(self.usize),
'nfiles': self.nfiles,
}
@property
def osize_fmt(self):
return format_file_size(self.osize)
@property
def usize_fmt(self):
return format_file_size(self.usize)
@property
def csize_fmt(self):
return format_file_size(self.csize)
def show_progress(self, item=None, final=False, stream=None, dt=None):
now = time.monotonic()
if dt is None or now - self.last_progress > dt:
self.last_progress = now
if self.output_json:
data = self.as_dict()
data.update({
'time': time.time(),
'type': 'archive_progress',
'path': remove_surrogates(item.path if item else ''),
})
msg = json.dumps(data)
end = '\n'
else:
columns, lines = get_terminal_size()
if not final:
msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
path = remove_surrogates(item.path) if item else ''
space = columns - swidth(msg)
if space < 12:
msg = ''
space = columns - swidth(msg)
if space >= 8:
msg += ellipsis_truncate(path, space)
else:
msg = ' ' * columns
end = '\r'
print(msg, end=end, file=stream or sys.stderr, flush=True)
def is_special(mode):
# file types that get special treatment in --read-special mode
return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
class BackupOSError(Exception):
"""
Wrapper for OSError raised while accessing backup files.
Borg does different kinds of IO, and IO failures have different consequences.
This wrapper represents failures of input file or extraction IO.
These are non-critical and are only reported (exit code = 1, warning).
Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
"""
def __init__(self, op, os_error):
self.op = op
self.os_error = os_error
self.errno = os_error.errno
self.strerror = os_error.strerror
self.filename = os_error.filename
def __str__(self):
if self.op:
return '%s: %s' % (self.op, self.os_error)
else:
return str(self.os_error)
class BackupIO:
op = ''
def __call__(self, op=''):
self.op = op
return self
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type and issubclass(exc_type, OSError):
raise BackupOSError(self.op, exc_val) from exc_val
backup_io = BackupIO()
def backup_io_iter(iterator):
backup_io.op = 'read'
while True:
with backup_io:
try:
item = next(iterator)
except StopIteration:
return
yield item
class DownloadPipeline:
def __init__(self, repository, key):
self.repository = repository
self.key = key
def unpack_many(self, ids, filter=None, preload=False):
"""
Return iterator of items.
*ids* is a chunk ID list of an item stream. *filter* is a callable
to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
unpacker = msgpack.Unpacker(use_list=False)
for data in self.fetch_many(ids):
unpacker.feed(data)
items = [Item(internal_dict=item) for item in unpacker]
for item in items:
if 'chunks' in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if filter:
items = [item for item in items if filter(item)]
if preload:
for item in items:
if 'chunks' in item:
self.repository.preload([c.id for c in item.chunks])
for item in items:
yield item
def fetch_many(self, ids, is_preloaded=False):
for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
yield self.key.decrypt(id_, data)
class ChunkBuffer:
BUFFER_SIZE = 8 * 1024 * 1024
def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
self.buffer = BytesIO()
self.packer = msgpack.Packer(unicode_errors='surrogateescape')
self.chunks = []
self.key = key
self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
def add(self, item):
self.buffer.write(self.packer.pack(item.as_dict()))
if self.is_full():
self.flush()
def write_chunk(self, chunk):
raise NotImplementedError
def flush(self, flush=False):
if self.buffer.tell() == 0:
return
self.buffer.seek(0)
# The chunker returns a memoryview to its internal buffer,
# thus a copy is needed before resuming the chunker iterator.
chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
self.buffer.seek(0)
self.buffer.truncate(0)
# Leave the last partial chunk in the buffer unless flush is True
end = None if flush or len(chunks) == 1 else -1
for chunk in chunks[:end]:
self.chunks.append(self.write_chunk(chunk))
if end == -1:
self.buffer.write(chunks[-1])
def is_full(self):
return self.buffer.tell() > self.BUFFER_SIZE
class CacheChunkBuffer(ChunkBuffer):
def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
super().__init__(key, chunker_params)
self.cache = cache
self.stats = stats
def write_chunk(self, chunk):
id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
self.cache.repository.async_response(wait=False)
return id_
class Archive:
class DoesNotExist(Error):
"""Archive {} does not exist"""
class AlreadyExists(Error):
"""Archive {} already exists"""
class IncompatibleFilesystemEncodingError(Error):
"""Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
def __init__(self, repository, key, manifest, name, cache=None, create=False,
checkpoint_interval=300, numeric_owner=False, noatime=False, noctime=False, nobsdflags=False,
progress=False, chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None,
consider_part_files=False, log_json=False):
self.cwd = os.getcwd()
self.key = key
self.repository = repository
self.cache = cache
self.manifest = manifest
self.hard_links = {}
self.stats = Statistics(output_json=log_json)
self.show_progress = progress
self.name = name # overwritten later with name from archive metadata
self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names)
self.comment = None
self.checkpoint_interval = checkpoint_interval
self.numeric_owner = numeric_owner
self.noatime = noatime
self.noctime = noctime
self.nobsdflags = nobsdflags
assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
if start is None:
start = datetime.utcnow()
start_monotonic = time.monotonic()
self.chunker_params = chunker_params
self.start = start
self.start_monotonic = start_monotonic
if end is None:
end = datetime.utcnow()
self.end = end
self.consider_part_files = consider_part_files
self.pipeline = DownloadPipeline(self.repository, self.key)
self.create = create
if self.create:
self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
if name in manifest.archives:
raise self.AlreadyExists(name)
i = 0
while True:
self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
if self.checkpoint_name not in manifest.archives:
break
i += 1
else:
info = self.manifest.archives.get(name)
if info is None:
raise self.DoesNotExist(name)
self.load(info.id)
self.zeros = None
def _load_meta(self, id):
data = self.key.decrypt(id, self.repository.get(id))
metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
if metadata.version != 1:
raise Exception('Unknown archive metadata version')
return metadata
def load(self, id):
self.id = id
self.metadata = self._load_meta(self.id)
self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
self.name = self.metadata.name
self.comment = self.metadata.get('comment', '')
@property
def ts(self):
"""Timestamp of archive creation (start) in UTC"""
ts = self.metadata.time
return parse_timestamp(ts)
@property
def ts_end(self):
"""Timestamp of archive creation (end) in UTC"""
# fall back to time if there is no time_end present in metadata
ts = self.metadata.get('time_end') or self.metadata.time
return parse_timestamp(ts)
@property
def fpr(self):
return bin_to_hex(self.id)
@property
def duration(self):
return format_timedelta(self.end - self.start)
@property
def duration_from_meta(self):
return format_timedelta(self.ts_end - self.ts)
def info(self):
if self.create:
stats = self.stats
start = self.start.replace(tzinfo=timezone.utc)
end = self.end.replace(tzinfo=timezone.utc)
else:
stats = self.calc_stats(self.cache)
start = self.ts
end = self.ts_end
info = {
'name': self.name,
'id': self.fpr,
'start': OutputTimestamp(start),
'end': OutputTimestamp(end),
'duration': (end - start).total_seconds(),
'stats': stats.as_dict(),
'limits': {
'max_archive_size': self.cache.chunks[self.id].csize / MAX_DATA_SIZE,
},
}
if self.create:
info['command_line'] = sys.argv
else:
info.update({
'command_line': self.metadata.cmdline,
'hostname': self.metadata.hostname,
'username': self.metadata.username,
'comment': self.metadata.get('comment', ''),
'chunker_params': self.metadata.get('chunker_params', ''),
})
return info
def __str__(self):
return '''\
Archive name: {0.name}
Archive fingerprint: {0.fpr}
Time (start): {start}
Time (end): {end}
Duration: {0.duration}
Number of files: {0.stats.nfiles}
Utilization of max. archive size: {csize_max:.0%}
'''.format(
self,
start=OutputTimestamp(self.start.replace(tzinfo=timezone.utc)),
end=OutputTimestamp(self.end.replace(tzinfo=timezone.utc)),
csize_max=self.cache.chunks[self.id].csize / MAX_DATA_SIZE)
def __repr__(self):
return 'Archive(%r)' % self.name
def item_filter(self, item, filter=None):
if not self.consider_part_files and 'part' in item:
# this is a part(ial) file, we usually don't want to consider it.
return False
return filter(item) if filter else True
def iter_items(self, filter=None, preload=False):
for item in self.pipeline.unpack_many(self.metadata.items, preload=preload,
filter=lambda item: self.item_filter(item, filter)):
yield item
def add_item(self, item, show_progress=True):
if show_progress and self.show_progress:
self.stats.show_progress(item=item, dt=0.2)
self.items_buffer.add(item)
def write_checkpoint(self):
self.save(self.checkpoint_name)
del self.manifest.archives[self.checkpoint_name]
self.cache.chunk_decref(self.id, self.stats)
def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
name = name or self.name
if name in self.manifest.archives:
raise self.AlreadyExists(name)
self.items_buffer.flush(flush=True)
duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
if timestamp is None:
end = datetime.utcnow()
start = end - duration
else:
end = timestamp + duration
start = timestamp
self.start = start
self.end = end
metadata = {
'version': 1,
'name': name,
'comment': comment or '',
'items': self.items_buffer.chunks,
'cmdline': sys.argv,
'hostname': socket.gethostname(),
'username': getuser(),
'time': start.strftime(ISO_FORMAT),
'time_end': end.strftime(ISO_FORMAT),
'chunker_params': self.chunker_params,
}
metadata.update(additional_metadata or {})
metadata = ArchiveItem(metadata)
data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
self.id = self.key.id_hash(data)
self.cache.add_chunk(self.id, data, self.stats)
while self.repository.async_response(wait=True) is not None:
pass
self.manifest.archives[name] = (self.id, metadata.time)
self.manifest.write()
self.repository.commit()
self.cache.commit()
def calc_stats(self, cache):
def add(id):
entry = cache.chunks[id]
archive_index.add(id, 1, entry.size, entry.csize)
archive_index = ChunkIndex()
sync = CacheSynchronizer(archive_index)
add(self.id)
pi = ProgressIndicatorPercent(total=len(self.metadata.items), msg='Calculating statistics... %3d%%')
for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
pi.show(increase=1)
add(id)
data = self.key.decrypt(id, chunk)
sync.feed(data)
stats = Statistics()
stats.osize, stats.csize, unique_size, stats.usize, unique_chunks, chunks = archive_index.stats_against(cache.chunks)
stats.nfiles = sync.num_files
pi.finish()
return stats
@contextmanager
def extract_helper(self, dest, item, path, stripped_components, original_path, hardlink_masters):
hardlink_set = False
# Hard link?
if 'source' in item:
source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:])
chunks, link_target = hardlink_masters.get(item.source, (None, source))
if link_target:
# Hard link was extracted previously, just link
with backup_io('link'):
os.link(link_target, path)
hardlink_set = True
elif chunks is not None:
# assign chunks to this item, since the item which had the chunks was not extracted
item.chunks = chunks
yield hardlink_set
if not hardlink_set and hardlink_masters:
# Update master entry with extracted item path, so that following hardlinks don't extract twice.
hardlink_masters[item.get('source') or original_path] = (None, path)
def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
hardlink_masters=None, stripped_components=0, original_path=None, pi=None):
"""
Extract archive item.
:param item: the item to extract
:param restore_attrs: restore file attributes
:param dry_run: do not write any data
:param stdout: write extracted data to stdout
:param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
:param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
:param stripped_components: stripped leading path components to correct hard link extraction
:param original_path: 'path' key as stored in archive
:param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
"""
hardlink_masters = hardlink_masters or {}
has_damaged_chunks = 'chunks_healthy' in item
if dry_run or stdout:
if 'chunks' in item:
item_chunks_size = 0
for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
if pi:
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
if stdout:
sys.stdout.buffer.write(data)
item_chunks_size += len(data)
if stdout:
sys.stdout.buffer.flush()
if 'size' in item:
item_size = item.size
if item_size != item_chunks_size:
logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
item.path, item_size, item_chunks_size))
if has_damaged_chunks:
logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
remove_surrogates(item.path))
return
original_path = original_path or item.path
dest = self.cwd
if item.path.startswith(('/', '../')):
raise Exception('Path should be relative and local')
path = os.path.join(dest, item.path)
# Attempt to remove existing files, ignore errors on failure
try:
st = os.stat(path, follow_symlinks=False)
if stat.S_ISDIR(st.st_mode):
os.rmdir(path)
else:
os.unlink(path)
except UnicodeEncodeError:
raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
except OSError:
pass
def make_parent(path):
parent_dir = os.path.dirname(path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
mode = item.mode
if stat.S_ISREG(mode):
with backup_io('makedirs'):
make_parent(path)
with self.extract_helper(dest, item, path, stripped_components, original_path,
hardlink_masters) as hardlink_set:
if hardlink_set:
return
if sparse and self.zeros is None:
self.zeros = b'\0' * (1 << self.chunker_params[1])
with backup_io('open'):
fd = open(path, 'wb')
with fd:
ids = [c.id for c in item.chunks]
for data in self.pipeline.fetch_many(ids, is_preloaded=True):
if pi:
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
with backup_io('write'):
if sparse and self.zeros.startswith(data):
# all-zero chunk: create a hole in a sparse file
fd.seek(len(data), 1)
else:
fd.write(data)
with backup_io('truncate_and_attrs'):
pos = item_chunks_size = fd.tell()
fd.truncate(pos)
fd.flush()
self.restore_attrs(path, item, fd=fd.fileno())
if 'size' in item:
item_size = item.size
if item_size != item_chunks_size:
logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
item.path, item_size, item_chunks_size))
if has_damaged_chunks:
logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
remove_surrogates(item.path))
return
with backup_io:
# No repository access beyond this point.
if stat.S_ISDIR(mode):
make_parent(path)
if not os.path.exists(path):
os.mkdir(path)
if restore_attrs:
self.restore_attrs(path, item)
elif stat.S_ISLNK(mode):
make_parent(path)
source = item.source
try:
os.symlink(source, path)
except UnicodeEncodeError:
raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
self.restore_attrs(path, item, symlink=True)
elif stat.S_ISFIFO(mode):
make_parent(path)
with self.extract_helper(dest, item, path, stripped_components, original_path,
hardlink_masters) as hardlink_set:
if hardlink_set:
return
os.mkfifo(path)
self.restore_attrs(path, item)
elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
make_parent(path)
with self.extract_helper(dest, item, path, stripped_components, original_path,
hardlink_masters) as hardlink_set:
if hardlink_set:
return
os.mknod(path, item.mode, item.rdev)
self.restore_attrs(path, item)
else:
raise Exception('Unknown archive item type %r' % item.mode)
def restore_attrs(self, path, item, symlink=False, fd=None):
"""
Restore filesystem attributes on *path* (*fd*) from *item*.
Does not access the repository.
"""
backup_io.op = 'attrs'
uid = gid = None
if not self.numeric_owner:
uid = user2uid(item.user)
gid = group2gid(item.group)
uid = item.uid if uid is None else uid
gid = item.gid if gid is None else gid
# This code is a bit of a mess due to os specific differences
try:
if fd:
os.fchown(fd, uid, gid)
else:
os.chown(path, uid, gid, follow_symlinks=False)
except OSError:
pass
if fd:
os.fchmod(fd, item.mode)
elif not symlink:
os.chmod(path, item.mode)
elif has_lchmod: # Not available on Linux
os.lchmod(path, item.mode)
mtime = item.mtime
if 'atime' in item:
atime = item.atime
else:
# old archives only had mtime in item metadata
atime = mtime
if 'birthtime' in item:
birthtime = item.birthtime
try:
# This should work on FreeBSD, NetBSD, and Darwin and be harmless on other platforms.
# See utimes(2) on either of the BSDs for details.
if fd:
os.utime(fd, None, ns=(atime, birthtime))
else:
os.utime(path, None, ns=(atime, birthtime), follow_symlinks=False)
except OSError:
# some systems don't support calling utime on a symlink
pass
try:
if fd:
os.utime(fd, None, ns=(atime, mtime))
else:
os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
except OSError:
# some systems don't support calling utime on a symlink
pass
acl_set(path, item, self.numeric_owner)
# chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
# the Linux capabilities in the "security.capability" attribute.
xattrs = item.get('xattrs', {})
for k, v in xattrs.items():
try:
xattr.setxattr(fd or path, k, v, follow_symlinks=False)
except OSError as e:
if e.errno == errno.E2BIG:
# xattr is too big
logger.warning('%s: Value or key of extended attribute %s is too big for this filesystem' %
(path, k.decode()))
set_ec(EXIT_WARNING)
elif e.errno == errno.ENOTSUP:
# xattrs not supported here
logger.warning('%s: Extended attributes are not supported on this filesystem' % path)
set_ec(EXIT_WARNING)
elif e.errno == errno.EACCES:
# permission denied to set this specific xattr (this may happen related to security.* keys)
logger.warning('%s: Permission denied when setting extended attribute %s' % (path, k.decode()))
set_ec(EXIT_WARNING)
else:
raise
# bsdflags include the immutable flag and need to be set last:
if not self.nobsdflags and 'bsdflags' in item:
try:
set_flags(path, item.bsdflags, fd=fd)
except OSError:
pass
def set_meta(self, key, value):
metadata = self._load_meta(self.id)
setattr(metadata, key, value)
data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
new_id = self.key.id_hash(data)
self.cache.add_chunk(new_id, data, self.stats)
self.manifest.archives[self.name] = (new_id, metadata.time)
self.cache.chunk_decref(self.id, self.stats)
self.id = new_id
def rename(self, name):
if name in self.manifest.archives:
raise self.AlreadyExists(name)
oldname = self.name
self.name = name
self.set_meta('name', name)
del self.manifest.archives[oldname]
def delete(self, stats, progress=False, forced=False):
class ChunksIndexError(Error):
"""Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
exception_ignored = object()
def fetch_async_response(wait=True):
try:
return self.repository.async_response(wait=wait)
except Repository.ObjectNotFound as e:
nonlocal error
# object not in repo - strange, but we wanted to delete it anyway.
if forced == 0:
raise
error = True
return exception_ignored # must not return None here
def chunk_decref(id, stats):
try:
self.cache.chunk_decref(id, stats, wait=False)
except KeyError:
cid = bin_to_hex(id)
raise ChunksIndexError(cid)
else:
fetch_async_response(wait=False)
error = False
try:
unpacker = msgpack.Unpacker(use_list=False)
items_ids = self.metadata.items
pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", msgid='archive.delete')
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
if progress:
pi.show(i)
data = self.key.decrypt(items_id, data)
unpacker.feed(data)
chunk_decref(items_id, stats)
try:
for item in unpacker:
item = Item(internal_dict=item)
if 'chunks' in item:
for chunk_id, size, csize in item.chunks:
chunk_decref(chunk_id, stats)
except (TypeError, ValueError):
# if items metadata spans multiple chunks and one chunk got dropped somehow,
# it could be that unpacker yields bad types
if forced == 0:
raise
error = True
if progress:
pi.finish()
except (msgpack.UnpackException, Repository.ObjectNotFound):
# items metadata corrupted
if forced == 0:
raise
error = True
# in forced delete mode, we try hard to delete at least the manifest entry,
# if possible also the archive superblock, even if processing the items raises
# some harmless exception.
chunk_decref(self.id, stats)
del self.manifest.archives[self.name]
while fetch_async_response(wait=True) is not None:
# we did async deletes, process outstanding results (== exceptions),
# so there is nothing pending when we return and our caller wants to commit.
pass
if error:
logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
logger.warning('borg check --repair is required to free all space.')
@staticmethod
def _open_rb(path):
try:
# if we have O_NOATIME, this likely will succeed if we are root or owner of file:
return os.open(path, flags_noatime)
except PermissionError:
if flags_noatime == flags_normal:
# we do not have O_NOATIME, no need to try again:
raise
# Was this EPERM due to the O_NOATIME flag? Try again without it:
return os.open(path, flags_normal)
@staticmethod
def compare_archives_iter(archive1, archive2, matcher=None, can_compare_chunk_ids=False):
"""
Yields tuples with a path and an ItemDiff instance describing changes/indicating equality.
:param matcher: PatternMatcher class to restrict results to only matching paths.
:param can_compare_chunk_ids: Whether --chunker-params are the same for both archives.
"""
def hardlink_master_seen(item):
return 'source' not in item or not hardlinkable(item.mode) or item.source in hardlink_masters
def is_hardlink_master(item):
return item.get('hardlink_master', True) and 'source' not in item
def update_hardlink_masters(item1, item2):
if is_hardlink_master(item1) or is_hardlink_master(item2):
hardlink_masters[item1.path] = (item1, item2)
def has_hardlink_master(item, hardlink_masters):
return hardlinkable(item.mode) and item.get('source') in hardlink_masters
def compare_items(item1, item2):
if has_hardlink_master(item1, hardlink_masters):
item1 = hardlink_masters[item1.source][0]
if has_hardlink_master(item2, hardlink_masters):
item2 = hardlink_masters[item2.source][1]
return ItemDiff(item1, item2,
archive1.pipeline.fetch_many([c.id for c in item1.get('chunks', [])]),
archive2.pipeline.fetch_many([c.id for c in item2.get('chunks', [])]),
can_compare_chunk_ids=can_compare_chunk_ids)
def defer_if_necessary(item1, item2):
"""Adds item tuple to deferred if necessary and returns True, if items were deferred"""
update_hardlink_masters(item1, item2)
defer = not hardlink_master_seen(item1) or not hardlink_master_seen(item2)
if defer:
deferred.append((item1, item2))
return defer
orphans_archive1 = OrderedDict()
orphans_archive2 = OrderedDict()
deferred = []
hardlink_masters = {}
for item1, item2 in zip_longest(
archive1.iter_items(lambda item: matcher.match(item.path)),
archive2.iter_items(lambda item: matcher.match(item.path)),
):
if item1 and item2 and item1.path == item2.path:
if not defer_if_necessary(item1, item2):
yield (item1.path, compare_items(item1, item2))
continue
if item1:
matching_orphan = orphans_archive2.pop(item1.path, None)
if matching_orphan:
if not defer_if_necessary(item1, matching_orphan):
yield (item1.path, compare_items(item1, matching_orphan))
else:
orphans_archive1[item1.path] = item1
if item2:
matching_orphan = orphans_archive1.pop(item2.path, None)
if matching_orphan:
if not defer_if_necessary(matching_orphan, item2):
yield (matching_orphan.path, compare_items(matching_orphan, item2))
else:
orphans_archive2[item2.path] = item2
# At this point orphans_* contain items that had no matching partner in the other archive
for added in orphans_archive2.values():
path = added.path
deleted_item = Item.create_deleted(path)
update_hardlink_masters(deleted_item, added)
yield (path, compare_items(deleted_item, added))
for deleted in orphans_archive1.values():
path = deleted.path
deleted_item = Item.create_deleted(path)
update_hardlink_masters(deleted, deleted_item)
yield (path, compare_items(deleted, deleted_item))
for item1, item2 in deferred:
assert hardlink_master_seen(item1)
assert hardlink_master_seen(item2)
yield (path, compare_items(item1, item2))
class MetadataCollector:
def __init__(self, *, noatime, noctime, numeric_owner, nobsdflags, nobirthtime):
self.noatime = noatime
self.noctime = noctime
self.numeric_owner = numeric_owner
self.nobsdflags = nobsdflags
self.nobirthtime = nobirthtime
def stat_simple_attrs(self, st):
attrs = dict(
mode=st.st_mode,
uid=st.st_uid,
gid=st.st_gid,
mtime=safe_ns(st.st_mtime_ns),
)
# borg can work with archives only having mtime (older attic archives do not have
# atime/ctime). it can be useful to omit atime/ctime, if they change without the
# file content changing - e.g. to get better metadata deduplication.
if not self.noatime:
attrs['atime'] = safe_ns(st.st_atime_ns)
if not self.noctime:
attrs['ctime'] = safe_ns(st.st_ctime_ns)
if not self.nobirthtime and hasattr(st, 'st_birthtime'):
# sadly, there's no stat_result.st_birthtime_ns
attrs['birthtime'] = safe_ns(int(st.st_birthtime * 10**9))
if self.numeric_owner:
attrs['user'] = attrs['group'] = None
else:
attrs['user'] = uid2user(st.st_uid)
attrs['group'] = gid2group(st.st_gid)
return attrs
def stat_ext_attrs(self, st, path):
attrs = {}
bsdflags = 0
with backup_io('extended stat'):
xattrs = xattr.get_all(path, follow_symlinks=False)
if not self.nobsdflags:
bsdflags = get_flags(path, st)
acl_get(path, attrs, st, self.numeric_owner)
if xattrs:
attrs['xattrs'] = StableDict(xattrs)
if bsdflags:
attrs['bsdflags'] = bsdflags
return attrs
def stat_attrs(self, st, path):
attrs = self.stat_simple_attrs(st)
attrs.update(self.stat_ext_attrs(st, path))
return attrs
class ChunksProcessor:
# Processes an iterator of chunks for an Item
def __init__(self, *, key, cache,
add_item, write_checkpoint,
checkpoint_interval, rechunkify):
self.key = key
self.cache = cache
self.add_item = add_item
self.write_checkpoint = write_checkpoint
self.checkpoint_interval = checkpoint_interval
self.last_checkpoint = time.monotonic()
self.rechunkify = rechunkify
def write_part_file(self, item, from_chunk, number):
item = Item(internal_dict=item.as_dict())
length = len(item.chunks)
# the item should only have the *additional* chunks we processed after the last partial item:
item.chunks = item.chunks[from_chunk:]