/
shared.py
152 lines (119 loc) · 4.71 KB
/
shared.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from __future__ import division
import sys
import ctypes
import os
import time
from subprocess import check_output
import platform
import uuid
_outputStatusLastSize = 0
_outputStatusDontReplaceLine = False
class BackupDataError(Exception):
pass
class DDError(Exception):
pass
class BackupError(Exception):
pass
class UnimplementedPlatformError(Exception):
pass
class AverageSpeedCalculator(object):
"""Class for calculating average copy speed of several copy operations"""
def __init__(self, maxSamples):
self.startTime = None
self.currentAverageSpeed = None
self.maxSamples = maxSamples
self.timingList = []
self.bytesCopiedList = []
def startOfCycle(self):
self.startTime = time.time()
def endOfCycle(self, bytesCopied):
self.timingList.append(time.time()-self.startTime)
self.bytesCopiedList.append(bytesCopied)
self.timingList = self.timingList[-self.maxSamples:]
self.bytesCopiedList = self.bytesCopiedList[-self.maxSamples:]
self.currentAverageSpeed = sum(self.bytesCopiedList) / sum(self.timingList)
def averageSpeed(self):
return self.currentAverageSpeed
def outputStatus(str):
"""Prints a line to the console that overwrites the previous line, allowing for status updates."""
if _outputStatusDontReplaceLine:
sys.stdout.write(str + '\n')
return
global _outputStatusLastSize
if len(str) < _outputStatusLastSize:
str = str + (' ' * (_outputStatusLastSize-len(str)))
sys.stdout.write(str + '\r')
sys.stdout.flush()
_outputStatusLastSize = len(str)
def humanReadableSize(bytes):
"""Returns a nicer human readable representation of the given size in bytes"""
if bytes < 1024:
return '%db' % bytes
elif bytes < (1024*1024):
return '%.1fK' % (bytes / 1024)
elif bytes < (1024*1024*1024):
return '%.1fM' % (bytes / (1024*1024))
else:
return '%.1fG' % (bytes / (1024*1024*1024))
def humanReadableSizeToBytes(value):
"""Converts a human readable size value into an exact number of bytes. Uses
the same format as dd."""
validSuffixes = {'b':512, 'k':1024, 'm':1048576, 'g':1073741824, 'w':ctypes.sizeof(ctypes.c_int)}
value = value.lower().strip()
if value[-1] in validSuffixes:
numberPart = value[:-1]
suffix = value[-1]
else:
numberPart = value
suffix = None
if numberPart.startswith('0x'):
number = int(numberPart, 16)
elif numberPart.startswith('0'):
number = int(numberPart, 8)
else:
number = int(numberPart, 10)
if suffix is None:
return number
else:
return number * validSuffixes[suffix]
def isPartFile(filename):
return len(filename) == 13 and filename.startswith('part_') and filename[-8:].isdigit()
def partsInSnapshot(dest):
return sorted(filter(isPartFile, os.listdir(dest)))
def normalizeUUID(uuidString):
return str(uuid.UUID(uuidString)).lower()
def findDiskDeviceIdentifierByUUIDMacOS(uuidString):
import plistlib
diskUtilPlistData = check_output(['diskutil', 'list', '-plist'])
diskUtilData = plistlib.readPlistFromString(diskUtilPlistData)
allDisksAndPartitions = diskUtilData['AllDisksAndPartitions']
def findDiskUUIDInList(partitionList, targetUUIDString):
for partition in partitionList:
matches = (('DiskUUID' in partition and partition['DiskUUID'].lower() == targetUUIDString) or
('VolumeUUID' in partition and partition['VolumeUUID'].lower() == targetUUIDString))
if matches:
# Want to provide the unbuffered device identifier for better performance, hence the r
return '/dev/r' + partition['DeviceIdentifier']
return None
for data in allDisksAndPartitions:
if 'Partitions' in data:
result = findDiskUUIDInList(data['Partitions'], uuidString)
if result is not None:
return result
if 'APFSVolumes' in data:
result = findDiskUUIDInList(data['APFSVolumes'], uuidString)
if result is not None:
return result
return None
def findDiskDeviceIdentifierByUUID(uuidString):
uuidString = normalizeUUID(uuidString)
if platform.system() == 'Darwin':
return findDiskDeviceIdentifierByUUIDMacOS(uuidString)
else:
raise UnimplementedPlatformError('Finding a device by UUID is not implemented for platform: %s' % platform.system())
def isUUID(uuidString):
try:
uuid.UUID(uuidString)
return True
except ValueError:
return False