-
Notifications
You must be signed in to change notification settings - Fork 0
/
flashtest
executable file
·303 lines (228 loc) · 9.97 KB
/
flashtest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
#!/usr/bin/env python3
"""Test a flash media device really has the stated capacity"""
import os, os.path
import sys
import tempfile
import hashlib
import re
import argparse
import unittest
# SHA256 is sensible but this makes it easy to change
HASH_FUNC = hashlib.sha256
# Rough target for how many files we can create. Fine for VFAT __so long as
# you're outside the root dir__
FILES_TO_CREATE = 256
class ChecksumError(Exception):
"""File corrupt: didn't match expected checksum"""
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
BLOCK_UNITS = {None: 1, 'b':1, 'k': 2**10, 'm': 2**20, 'g':2**30}
RE_SIZE_STR = re.compile(r'^(\d+)(\w*)$')
def parse_size(s):
"""Parse strings like '4M', '512k' into an integer. Only non-fractional
suffixes make sense: mega & giga, not milli & micro.
Args:
s: string containing number & optional SI suffix
Raises:
ValueError: String wasn't parseable
Returns:
Integer parsed value
"""
m = RE_SIZE_STR.search(s)
if not m:
raise ValueError("Couldn't parse size")
u = m.group(2).lower() if m.group(2) else None
try:
return int(m.group(1)) * BLOCK_UNITS[u]
except KeyError:
raise ValueError("Unknown unit suffix")
def get_free_space(vfs):
"""Find bytes space free for writing in path `vfs`
Returns:
int number of bytes free
"""
st = os.statvfs(vfs)
return st.f_bavail * st.f_frsize
class TestGetFreeSpace(unittest.TestCase):
def test_get_free_space(self):
"Read free space on /"
space = get_free_space('/')
self.assertGreater(space, 0)
def auto_file_size(media_size):
"""Return a sensible file size for media of size `media_size`. The
objective here is not to create too many files, however large the media
we're testing, while still filling it as much as possible. 256 files is
a reasonable number."""
if media_size < 100:
raise ValueError("Invalid media size; way too small")
return int(media_size/FILES_TO_CREATE)
class TestAutoFileSize(unittest.TestCase):
def test_media_16mb(self):
"Auto file size for 16Mb media"
self.assertEqual(auto_file_size(16777216), 65536)
def test_media_1gb(self):
"Auto file size for 1Gb media"
self.assertEqual(auto_file_size(1073741824), 4194304)
def test_media_small(self):
"Auto file size for implausibly small media"
with self.assertRaises(ValueError):
auto_file_size(1)
class ParseSizeAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, int(parse_size(values)))
class TestParseSize(unittest.TestCase):
def test_invalid_size(self):
"Parse invalid sizes"
with self.assertRaises(ValueError):
parse_size('')
with self.assertRaises(ValueError):
parse_size('4mm')
with self.assertRaises(ValueError):
parse_size('-1')
def test_valid_size(self):
"Parse valid sizes"
self.assertEqual(parse_size('0'), 0)
self.assertEqual(parse_size('512'), 512)
self.assertEqual(parse_size('1k'), 1024)
self.assertEqual(parse_size('1M'), 1048576)
self.assertEqual(parse_size('1g'), 1073741824)
class MediaTester(object):
def __init__(self, block_size=2**22):
"""
Args:
block_size: int, bytes for each write
"""
super(MediaTester,self).__init__()
self.block_size = block_size
def test_filesystem(self, media_path, max_files=None, file_size=None, verbose=False):
"""Test writing to a mounted filesystem.
Args:
media_path: string, path at which media is mounted
max_files: int, write no more than N files leaving media unfilled
verbose: print filenames as we read/write them
"""
if file_size is None:
file_size = auto_file_size( get_free_space(media_path) )
checksums = []
with tempfile.TemporaryDirectory(None, 'ftest_', media_path) as temp_dir:
print("Testing in %s with files of %d bytes" % (temp_dir,file_size))
try:
for path in self.gen_filenames(temp_dir, max_files):
if verbose:
print("Writing %s" % path)
cksum, full = self.write_file(path, file_size)
checksums.append( (path, cksum ))
if full:
print("Filled media ok")
break
for path, cksum in checksums:
if verbose:
print("Reading %s" % path)
self.check_file(path, cksum)
except (IOError, ChecksumError) as e:
eprint("FAILURE: %s" % str(e))
raise
except KeyboardInterrupt:
print("Aborted")
else:
print("SUCCESS: Media wrote/read without errors")
def test_raw(self, device):
raise NotImplementedError()
def gen_filenames(self, base_path, count=None):
"""Generate a predictable sequence of filenames
Args:
count: Number of filenames to synthesize
Yields:
Iterator of string filenames
"""
if count is None:
count = sys.maxsize
spec = "ft{:08d}.dat"
for n in range(0, count):
yield os.path.join(base_path, spec.format(n+1))
def gen_block(self):
return os.urandom(self.block_size)
def write_file(self, path, file_size):
"""Write a file up to file_size long full of random data. Writes are
block_size bytes long to limit wear on media. File is flushed & fsync-ed
at end.
Args:
path: Filename to write
file_size: File will be UP TO N bytes long
Returns:
Tuple of (bytes checksum, bool device_full)
Raises:
IOError: For write failures other than disk full
"""
m = HASH_FUNC()
device_full = False
with open(path, 'wb', buffering=0) as f:
while f.tell() + self.block_size <= file_size:
orig_pos = f.tell()
b = self.gen_block()
try:
size = f.write(b)
m.update(b[:size])
except IOError as e:
if e.errno == 28:
device_full = True
f.seek(orig_pos) # 'undo' failed write; updating cksm
f.truncate() # from a partial block does not work.
break # Yes this is imperfect.
else:
raise
f.flush() # ensure blocks all written to device; dev
os.fsync(f.fileno()) # may still lie but it's the best we can do
# print("Wrote %d bytes" % f.tell())
return ( m.digest(), device_full )
def check_file(self, path, checksum):
"""Test file `path` conforms to `checksum`.
Args:
path: String of filename
checksum: Bytes representing expected checksum
Raises:
ChecksumError: file did not match expected checksum
"""
# os.O_RDONLY | os.O_DIRECT - no os.O_DIRECT in OSX
with open(path, 'rb', buffering=0) as f:
m = HASH_FUNC()
while True:
b = f.read(self.block_size)
if len(b) == 0: # EOF
break
m.update(b)
if m.digest() != checksum:
raise ChecksumError()
parser = argparse.ArgumentParser(
description='Test flash media by writing & validating random files.',
epilog="""
Counterfeit flash media typically fakes its size by overwriting existing blocks. This means writes succeed, and you only discover something is wrong when you later try to read your data. Flashtest fills your device with random files then uses checksums to validate the data can be re-read.
Also useful for detecting genuine-but-worn-out media.
NB: O_DIRECT is unavailable on OSX. Thus there's no way (without a `sudo purge`) to fully disable the OS's block cache. It is assumed that your flash media is sufficiently larger than ram for this not to matter :)
No warranty but it works for me. Be aware that repeatedly writing to your flash wears it out.
"""
)
parser.add_argument('-b', '--blocksize', dest='blocksize', default=4194304,
action=ParseSizeAction,
help='block size (default 4m) - either an integer byte count or SI unit like 512b, 8k, 16m etc.')
parser.add_argument('-f', '--filesize', dest='filesize', default=None,
action=ParseSizeAction,
help='file size - either an integer byte count or SI unit like 512b, 8k, 16m etc. Default is to automatically choose based on media size.')
parser.add_argument('-c', '--count', dest='count', default=None, type=int,
help='limit written files (default is to fill media)')
parser.add_argument('-t', '--test', default=False,
action='store_const', const=True,
help='run unit tests')
parser.add_argument('-v', '--verbose', default=False,
action='store_const', const=True,
help='verbose output')
parser.add_argument('path', default='.',
help="path to write data, where your flash is mounted")
if __name__ == '__main__':
args = parser.parse_args()
if args.test:
unittest.main(argv=sys.argv[0:1])
sys.exit()
# print(args)
mt = MediaTester(args.blocksize)
mt.test_filesystem(args.path, args.count, args.filesize, verbose=args.verbose)