This repository has been archived by the owner on May 4, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
test_exif2timestream.py
471 lines (429 loc) · 17.7 KB
/
test_exif2timestream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
from copy import deepcopy
import exif2timestream as e2t
from hashlib import md5
import os
from os import path
from shutil import rmtree, copytree
import time
from time import strptime
import unittest
from voluptuous import MultipleInvalid
from tempfile import NamedTemporaryFile
import pexif
import warnings
SKIMAGE = False
try:
import skimage
# SKIMAGE = True
except ImportError:
pass
class TestExifTraitcapture(unittest.TestCase):
dirname = path.dirname(__file__)
test_config_csv = path.join(dirname, "config.csv")
test_config_dates_csv = path.join(dirname, "config_dates.csv")
test_config_raw_csv = path.join(dirname, "config_raw.csv")
bad_header_config_csv = path.join(dirname, "bad_header_config.csv")
bad_values_config_csv = path.join(dirname, "bad_values_config.csv")
unused_bad_cam_csv = path.join(dirname, "unused_cams_with_bad_values.csv")
out_dirname = path.join(dirname, "out")
camupload_dir = path.join(dirname, "img", "camupload")
noexif_testfile = path.join(dirname, "img", "IMG_0001_NOEXIF.JPG")
jpg_testfile = path.join(camupload_dir, "jpg", "IMG_0001.JPG")
raw_testfile = path.join(camupload_dir, "raw", "IMG_0001.CR2")
camera_win32 = {
'ARCHIVE_DEST': '\\'.join([out_dirname, 'archive']),
'CURRENT_EXPT': 'BVZ00000',
'DESTINATION': '\\'.join([out_dirname, 'timestreams']),
'CAM_NUM': 1,
'EXPT_END': '2013_12_31',
'EXPT_START': '2013_11_01',
'LOCATION': 'EUC-R01C01',
'METHOD': 'copy',
'IMAGE_TYPES': 'raw~jpg',
'INTERVAL': '5',
'mode': 'batch',
'resolutions': 'original~1024x768~640x480',
'SOURCE': '\\'.join([dirname, "img", "camupload"]),
'sunrise': '500',
'sunset': '2200',
'camera_timezone': '1100',
'USE': '1',
'user': 'Glasshouses'
}
camera_unix = {
'ARCHIVE_DEST': '/'.join([out_dirname, 'archive']),
'CURRENT_EXPT': 'BVZ00000',
'DESTINATION': '/'.join([out_dirname, 'timestreams']),
'CAM_NUM': 1,
'EXPT_END': '2013_12_31',
'EXPT_START': '2013_11_01',
'LOCATION': 'EUC-R01C01',
'METHOD': 'copy',
'INTERVAL': '5',
'IMAGE_TYPES': 'raw~jpg',
'mode': 'batch',
'resolutions': 'original~1024x768~640x480',
'SOURCE': '/'.join([dirname, "img", "camupload"]),
'sunrise': '500',
'sunset': '2200',
'camera_timezone': '1100',
'USE': '1',
'user': 'Glasshouses'
}
r_fullres_path = path.join(
out_dirname, "timestreams", "BVZ00000",
'BVZ00000-EUC-R01C01-C01~fullres-orig', '2013', '2013_11',
'2013_11_12', '2013_11_12_20',
'BVZ00000-EUC-R01C01-C01~fullres-orig_2013_11_12_20_55_00_00.JPG'
)
r_raw_path = path.join(
out_dirname, "timestreams", "BVZ00000",
'BVZ00000-EUC-R01C01-C01~fullres-raw', '2013', '2013_11',
'2013_11_12', '2013_11_12_20',
'BVZ00000-EUC-R01C01-C01~fullres-raw_2013_11_12_20_55_00_00.CR2'
)
maxDiff = None
# helpers
def _md5test(self, filename, expected_hash):
with open(filename, "rb") as fh:
out_contents = fh.read()
md5hash = md5()
md5hash.update(out_contents)
md5hash = md5hash.hexdigest()
self.assertEqual(md5hash, expected_hash)
# setup
def setUp(self):
if path.sep == "/":
self.camera_raw = deepcopy(self.camera_unix)
self.camera = deepcopy(self.camera_unix)
else:
self.camera_raw = deepcopy(self.camera_win32)
self.camera = deepcopy(self.camera_win32)
img_dir = path.dirname(self.camera[e2t.FIELDS['source']])
if not path.exists(img_dir):
os.mkdir(img_dir)
if not path.exists(self.out_dirname):
os.mkdir(self.out_dirname)
if not path.exists(self.camera[e2t.FIELDS['destination']]):
os.mkdir(self.camera[e2t.FIELDS['destination']])
if not path.exists(self.camera[e2t.FIELDS['archive_dest']]):
os.mkdir(self.camera[e2t.FIELDS['archive_dest']])
rmtree(img_dir)
copytree("./test/unburnable", img_dir)
self.camera = e2t.validate_camera(self.camera)
# test for localise_cam_config
def test_localise_cam_config(self):
self.assertDictEqual(
e2t.localise_cam_config(self.camera_win32),
self.camera_raw)
self.assertDictEqual(
e2t.localise_cam_config(self.camera_unix),
self.camera_raw)
# tests for round_struct_time
def test_round_struct_time_gmt(self):
start = time.strptime("20131112 205309", "%Y%m%d %H%M%S")
rnd_5 = e2t.round_struct_time(start, 300, tz_hrs=11, uselocal=False)
rnd_5_expt = time.strptime("20131112 095500", "%Y%m%d %H%M%S")
self.assertIsInstance(rnd_5, time.struct_time)
self.assertEqual(time.mktime(rnd_5), time.mktime(rnd_5_expt))
def test_round_struct_time_local(self):
start = time.strptime("20131112 205309", "%Y%m%d %H%M%S")
rnd_5 = e2t.round_struct_time(start, 300, tz_hrs=11)
rnd_5_expt = time.strptime("20131112 205500", "%Y%m%d %H%M%S")
self.assertIsInstance(rnd_5, time.struct_time)
self.assertEqual(time.mktime(rnd_5), time.mktime(rnd_5_expt))
# tests for _dont_clobber
def test_dont_clobber(self):
stop = e2t.SkipImage()
fh = NamedTemporaryFile()
fn = fh.name
# test raise/exception mode
with self.assertRaises(e2t.SkipImage):
e2t._dont_clobber(fn, mode=e2t.SkipImage)
with self.assertRaises(e2t.SkipImage):
e2t._dont_clobber(fn, mode=stop)
# test with bad mode
with self.assertRaises(ValueError):
e2t._dont_clobber(fn, mode="BADMODE")
# test append mode
expt = fn + "_1"
self.assertEqual(e2t._dont_clobber(fn), expt)
# test append mode with file extension
fn_ext = fn + ".txt"
with open(fn_ext, "w") as fh:
fh.write("This file will exist") # make a file with an extension
e_base, e_ext = path.splitext(fn_ext)
expt = ".".join(["_".join([e_base, "1"]), e_ext])
self.assertEqual(e2t._dont_clobber(fn_ext), expt)
os.unlink(fn_ext) # we have to remove this ourselves
# test append mode with file that doesn't exist
wontexist = fn + "_shouldnteverexist"
self.assertEqual(e2t._dont_clobber(wontexist), wontexist)
# tests for get_file_date
def test_get_file_date_jpg(self):
actual = time.strptime("20131112 205309", "%Y%m%d %H%M%S")
jpg_date = e2t.get_file_date(self.jpg_testfile)
self.assertEqual(jpg_date, actual)
def test_get_file_date_raw(self):
actual = time.strptime("20131112 205309", "%Y%m%d %H%M%S")
raw_date = e2t.get_file_date(self.raw_testfile)
self.assertEqual(raw_date, actual)
def test_get_file_date_noexif(self):
date = e2t.get_file_date(self.noexif_testfile)
self.assertIsNone(date)
# tests for get_new_file_name
def test_get_new_file_name(self):
date = time.strptime("20131112 205309", "%Y%m%d %H%M%S")
fn = e2t.get_new_file_name(date, 'test')
self.assertEqual(fn, ("2013/2013_11/2013_11_12/2013_11_12_20/"
"test_2013_11_12_20_53_09_00.jpg"))
def test_get_new_file_date_from_file(self):
date = e2t.get_file_date(self.jpg_testfile)
fn = e2t.get_new_file_name(date, 'test')
self.assertEqual(fn, ("2013/2013_11/2013_11_12/2013_11_12_20/"
"test_2013_11_12_20_53_09_00.jpg"))
date = e2t.get_file_date(self.jpg_testfile, round_secs=5 * 60)
fn = e2t.get_new_file_name(date, 'test')
self.assertEqual(fn, ("2013/2013_11/2013_11_12/2013_11_12_20/"
"test_2013_11_12_20_55_00_00.jpg"))
def test_get_new_file_nulls(self):
date = time.strptime("20131112 205309", "%Y%m%d %H%M%S")
with self.assertRaises(ValueError):
e2t.get_new_file_name(None, 'test')
with self.assertRaises(ValueError):
e2t.get_new_file_name(date, '')
with self.assertRaises(ValueError):
e2t.get_new_file_name(date, None)
with self.assertRaises(ValueError):
e2t.get_new_file_name(None, '')
# tests for make_timestream_name
def test_make_timestream_name_empty(self):
name = e2t.make_timestream_name(self.camera)
exp = 'BVZ00000-EUC-R01C01-C01~fullres-orig'
self.assertEqual(name, exp)
def test_make_timestream_name_params(self):
name = e2t.make_timestream_name(
self.camera,
res="1080x720",
step="clean")
exp = 'BVZ00000-EUC-R01C01-C01~1080x720-clean'
self.assertEqual(name, exp)
# tests for find_image_files
def test_find_image_files(self):
expt = {"jpg": {path.join(self.camupload_dir, x) for x in [
'jpg/IMG_0001.JPG',
'jpg/IMG_0002.JPG',
'jpg/IMG_0630.JPG',
'jpg/IMG_0633.JPG',
'jpg/whroo20131104_020255M.jpg']
},
"raw": {path.join(self.camupload_dir, 'raw/IMG_0001.CR2')},
}
got = e2t.find_image_files(self.camera)
self.assertSetEqual(set(got["jpg"]), expt["jpg"])
self.assertSetEqual(set(got["jpg"]), expt["jpg"])
def test_get_local_path(self):
winpath = "test\\path\\to\\file"
unixpath = "test/path/to/file"
if os.path.sep == "\\":
# windows
self.assertEqual(e2t.get_local_path(winpath), winpath)
self.assertEqual(e2t.get_local_path(unixpath), winpath)
elif os.path.sep == "/":
# unix
self.assertEqual(e2t.get_local_path(winpath), unixpath)
self.assertEqual(e2t.get_local_path(unixpath), unixpath)
else:
raise ValueError("Non-Unix/Win path seperator '%s' not supported" %
os.path.sep)
# tests for timestreamise_image
def test_timestreamise_image(self):
e2t.timestreamise_image(self.jpg_testfile, self.camera)
self.assertTrue(path.exists(self.r_fullres_path))
self._md5test(self.r_fullres_path, "76ee6fb2f5122d2f5815101ec66e7cb8")
# tests for process_image
def test_process_image(self):
e2t.process_image((self.jpg_testfile, self.camera, "jpg"))
self.assertTrue(path.exists(self.r_fullres_path))
self._md5test(self.r_fullres_path, "76ee6fb2f5122d2f5815101ec66e7cb8")
def test_process_image_map(self):
map(e2t.process_image, [(self.jpg_testfile, self.camera, "jpg")])
self.assertTrue(path.exists(self.r_fullres_path))
self._md5test(self.r_fullres_path, "76ee6fb2f5122d2f5815101ec66e7cb8")
# tests for parse_camera_config_csv
def test_parse_camera_config_csv(self):
configs = [
{
'ARCHIVE_DEST': './test/out/archive',
'camera_timezone': (11, 0),
'CURRENT_EXPT': 'BVZ00000',
'DESTINATION': './test/out/timestreams',
'CAM_NUM': 1,
'EXPT_END': strptime('2013_12_31', "%Y_%m_%d"),
'EXPT_START': strptime('2012_12_01', "%Y_%m_%d"),
'INTERVAL': 5,
'IMAGE_TYPES': ["jpg"],
'LOCATION': 'EUC-R01C01',
'METHOD': 'move',
'mode': 'batch',
'resolutions': ['original', (1024, 768), (640, 480)],
'SOURCE': './test/img/camupload',
'sunrise': (5, 0),
'sunset': (22, 0),
'USE': True,
'user': 'Glasshouses'
}
]
result = e2t.parse_camera_config_csv(self.test_config_csv)
for expt, got in zip(configs, result):
self.assertDictEqual(got, expt)
def test_unused_bad_camera(self):
# first entry is invalid but not used, should return None
got = list(e2t.parse_camera_config_csv(self.unused_bad_cam_csv))
self.assertListEqual(got, [])
# The case of "invalid but used" is dealt with in
# test_parse_camera_config_csv_badconfig
def test_parse_camera_config_csv_badconfig(self):
with self.assertRaises(KeyError):
list(e2t.parse_camera_config_csv(self.bad_header_config_csv))
with self.assertRaises(MultipleInvalid):
list(e2t.parse_camera_config_csv(self.bad_values_config_csv))
# tests for generate_config_csv
def test_generate_config_csv(self):
out_csv = path.join(self.out_dirname, "test_gencnf.csv")
e2t.generate_config_csv(out_csv)
self._md5test(out_csv, "bf1ff915a42390a15ab8e4704e5c38e9")
# Tests for checking parsing of dates from filename
def test_check_date_parse(self):
got = e2t.get_time_from_filename(
"whroo20141101_001212M.jpg", "%Y%m%d_%H%M%S")
expected = strptime("20141101_001212", "%Y%m%d_%H%M%S")
self.assertEqual(got, expected)
got = e2t.get_time_from_filename("TRN-NC-DSC-01~640_2013_06_01_10_45_00_00.jpg",
"%Y_%m_%d_%H_%M_%S")
expected = strptime("2013_06_01_10_45_00", "%Y_%m_%d_%H_%M_%S")
self.assertEqual(got, expected)
def test_check_write_exif(self):
# Write To Exif
filename = 'jpg/whroo20131104_020255M.jpg'
date_time = e2t.get_time_from_filename(
path.join(self.camupload_dir, filename), "%Y%m%d_%H%M%S")
e2t.write_exif_date(path.join(self.camupload_dir, filename), date_time)
# Read From Exif
exif_tags = pexif.JpegFile.fromFile(
path.join(self.camupload_dir, filename))
str_date = exif_tags.exif.primary.ExtendedEXIF.DateTimeOriginal
date = strptime(str_date, "%Y:%m:%d %H:%M:%S")
# Check Equal
self.assertEqual(date_time, date)
# Tests for checking image resizing
def test_check_resize_img(self):
if(SKIMAGE):
filename = 'jpg/whroo20131104_020255M.jpg'
new_width = 400
e2t.resize_img(path.join(self.camupload_dir, filename), new_width)
img = skimage.io.imread(path.join(self.camupload_dir, filename))
w = skimage.novice.open(
path.join(self.camupload_dir, filename)).width
self.assertEqual(w, new_width)
else:
warnings.warn(
"Skimage Not Installed, Unable to Test Resize", ImportWarning)
# tests for main function
def test_main(self):
e2t.main({
'-1': False,
'-d': False,
'-a': None,
'-c': self.test_config_csv,
'-l': self.out_dirname,
'-m': None,
'-g': None,
'-t': None})
#os.system("tree %s" % path.dirname(self.out_dirname))
self.assertTrue(path.exists(self.r_fullres_path))
def test_main_raw(self):
e2t.main({
'-1': False,
'-d': False,
'-a': None,
'-c': self.test_config_raw_csv,
'-l': self.out_dirname,
'-m': None,
'-g': None,
'-t': None})
#os.system("tree %s" % path.dirname(self.out_dirname))
self.assertTrue(path.exists(self.r_fullres_path))
self.assertTrue(path.exists(self.r_raw_path))
def test_main_expt_dates(self):
e2t.main({
'-1': False,
'-d': False,
'-a': None,
'-c': self.test_config_dates_csv,
'-l': self.out_dirname,
'-m': None,
'-g': None,
'-t': None})
#os.system("tree %s" % path.dirname(self.out_dirname))
self.assertFalse(path.exists(self.r_fullres_path))
def test_main_threads(self):
# with a good value for threads
e2t.main({
'-1': False,
'-d': False,
'-a': None,
'-c': self.test_config_csv,
'-l': self.out_dirname,
'-m': None,
'-g': None,
'-t': '2'})
self.assertTrue(path.exists(self.r_fullres_path))
def test_main_threads_bad(self):
# and with a bad one (should default back to n_cpus)
e2t.main({
'-1': False,
'-d': False,
'-a': None,
'-c': self.test_config_csv,
'-l': self.out_dirname,
'-m': None,
'-g': None,
'-t': "several"})
self.assertTrue(path.exists(self.r_fullres_path))
def test_main_threads_one(self):
# and with -1
e2t.main({
'-1': True,
'-d': False,
'-a': None,
'-c': self.test_config_csv,
'-l': self.out_dirname,
'-m': None,
'-g': None,
'-t': None})
self.assertTrue(path.exists(self.r_fullres_path))
# IMG0001.JPG should always be the first one, with one core it's
# deterministic
self._md5test(self.r_fullres_path, "76ee6fb2f5122d2f5815101ec66e7cb8")
def test_main_generate(self):
conf_out = path.join(self.out_dirname, "config.csv")
with self.assertRaises(SystemExit):
e2t.main({
'-1': False,
'-d': False,
'-a': None,
'-c': None,
'-l': self.out_dirname,
'-m': None,
'-g': conf_out,
'-t': None})
self.assertTrue(path.exists(conf_out))
self._md5test(conf_out, "bf1ff915a42390a15ab8e4704e5c38e9")
def tearDown(self):
#os.system("tree %s" % path.dirname(self.out_dirname))
rmtree(self.out_dirname)
# pass
if __name__ == "__main__":
runner = unittest.TextTestRunner(verbosity=3)
unittest.main(testRunner=runner)