Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 202 lines (169 sloc) 6.974 kB
370ff3c Initial Import
derks authored
1 import sys, os
2 import platform
3 import unittest
4 import tempfile
5 import string
6 import random
7 import logging
8 import logging.handlers as handlers
9 import md5
10 from shutil import rmtree
11
12 # FIXME: This used to test holland.helpers which was
13 # a collection of utility methods. These have
14 # since been forked off into various plugins
15 # or merged into holland.core.util. This
16 # should be updated to test holland.core.util
17 # and any other tests added to the appropriate
18 # plugin egg's test suite.
19
20 #class TestSysUtilsHelper(unittest.TestCase):
21 # hack: disabling test until I fix
22 # many of these functions have been shuffled around
23 # into individual plugins and need merged into those
24 # test cases
25 class Test(object):
26 """
27 A test class for testing the sysutils helper
28 """
29
30 def setUp(self):
31 self.log = logging.getLogger('holland')
32 file = logging.FileHandler(filename='/dev/null')
33 self.log.addHandler(file)
34
35
36 def test_ensure_dir(self):
37 # No arguments
38 self.assertRaises(TypeError, h.ensure_dir);
39 # Directory that already exists
40 self.assertEqual(h.ensure_dir('/tmp'), True)
41 # File that already exists
42 self.assertEqual(h.ensure_dir('/dev/null'), True)
43 # Directory that does not exist
44 self.assertEqual(h.ensure_dir('/tmp/testdir'), True)
45 # Directory that cannot be created
46 self.assertRaises(OSError, h.ensure_dir, '/dev/null/dir')
47 # Cleanup
48 os.rmdir('/tmp/testdir')
49
50
51 def test_protected_path(self):
52 # file
53 fd,file_path = tempfile.mkstemp(prefix='holland-test-')
54 safe_path = h.protected_path(file_path)
55 expected_path = "%s.0" % file_path
56 self.assertEquals(safe_path == expected_path, True)
57
58 # dir
59 dir_path = tempfile.mkdtemp(prefix='holland-test-')
60 safe_path = h.protected_path(dir_path)
61 expected_path = "%s.0" % dir_path
62 self.assertEquals(safe_path == expected_path, True)
63
64 # clean up
65 os.remove(file_path)
66 rmtree(dir_path)
67
68 def test_get_compression_stream(self):
69 for c_mode in ['gzip', 'bzip2']:
70 fd,file_path = tempfile.mkstemp(prefix='holland-test-')
71 dir_path = tempfile.mkdtemp(prefix='holland-test-dir')
72 file_path = os.path.realpath(file_path)
73 os.remove(file_path)
74 dir_path = os.path.realpath(dir_path)
75 data = ''
76 for i in xrange(1024**2):
77 data = data + random.choice(string.letters)
78
79 stream = h.get_compression_stream(output_path=file_path, mode=c_mode)
80 stream.write(data)
81 stream.close()
82
83 new_file_path = h.decompress_path(
84 source_path=file_path, dest_dir=dir_path, mode=c_mode
85 )
86
87 f = open(new_file_path, 'r')
88 a = md5.new(f.read()).digest()
89 b = md5.new(data).digest()
90 self.assertEqual(a == b, True)
91 f.close()
92
93 # clean up
94 os.remove(new_file_path)
95 rmtree(dir_path)
96
97 def test_compress_path(self):
98 # Test to see if a file can be gzipped and ungzipped
99 # (and it returns the same md5sum)
100 fd,file_path = tempfile.mkstemp(prefix='holland-test-')
101 dir_path = tempfile.mkdtemp(prefix='holland-test-dir')
102 file_path = os.path.realpath(file_path)
103 dir_path = os.path.realpath(dir_path)
104
105 # Create and compress the file
106 handle = os.fdopen(fd, 'w')
107 for i in xrange(1024**2):
108 handle.write(random.choice(string.letters))
109 handle.close()
110 comp_path = h.compress_path(
111 source_path = file_path, dest_dir = dir_path,
112 remove_source = False, mode = 'gzip'
113 )
114
115 self.assertEqual(comp_path != None, True)
116
117 # Uncompress the file and compare to original
118 uncomp_path = h.decompress_path(
119 source_path = comp_path, dest_dir = dir_path,
120 remove_source = False, mode = 'gzip'
121 )
122 self.assertEqual(uncomp_path != None, True)
123
124 original_file = file(file_path)
125 uncompressed_file = file(uncomp_path)
126
127 a = md5.new(original_file.read()).digest()
128 b = md5.new(uncompressed_file.read()).digest()
129 self.assertEqual(a == b, True)
130
131
132 # Platform-specific tests
133 # FIX ME:
134 # Tests are incomplete and have not been tested on Linux platform
135
136 def test_mount_info(self):
137 self.assertRaises(TypeError, h.mount_info)
138 if platform.system() != 'Linux':
139 print "Skipping Test For This Platform (%s)" % platform.system()
140 return False
141
142 def test_which(self):
143 # No arguments given
144 self.assertRaises(TypeError, h.which)
145 if platform.system() == 'Windows':
146 print "Skipping Test For This Platform (%s)" % platform.system()
147 return False
148 # Common utility test
149 self.assertEqual(h.which('ls'), '/bin/ls')
150 # Not found test
151 self.assertRaises(OSError, h.which, 'notacommand')
152
153 # FIX ME: Incomplete Test
154 def test_relpath(self):
155 # No arguments given
156 self.assertRaises(TypeError, h.relpath)
157 if platform.system() == 'Windows':
158 print "Skipping Test For This Platform (%s)" % platform.system()
159 return False
160 # Same Path
161 self.assertEqual(h.relpath('test', 'test'), '')
162 # Empty Path
163 self.assertEqual(h.relpath('', ''), '')
164 # Sub-Path
165 self.assertEqual(h.relpath('/tmp/test', '/test'), None)
166
167 # End of platform-specific tests
168
169 def test_format_bytes(self):
170 # No arguments given
171 self.assertRaises(TypeError, h.format_bytes)
172 # 0 bytes
173 self.assertEqual(h.format_bytes(0), '0.00B')
174 # 1b
175 self.assertEqual(h.format_bytes(1), '1.00B')
176 # 1KiB
177 self.assertEqual(h.format_bytes(1024), '1.00KiB')
178 # Remaing test for other units.
179 # Note the + 2 since we ran the '1b' and '1KiB' tests above
180 # and these were taken from the array in the original function
181 units = ['MiB','GiB','TiB','PiB','EiB','ZiB','YiB']
182 for unit in units:
183 power = units.index(unit) + 2
184 self.assertEqual(h.format_bytes(1024**power),
185 '1.00' + unit)
186 # Negative Bytes
187 self.assertRaises(ArithmeticError, h.format_bytes, -1);
188
189
190 def tearDown(self):
191 pass
192
193
194 def suite():
195 suite = unittest.TestSuite()
196 suite.addTest(unittest.makeSuite(TestSysUtilsHelper))
197 return suite
198
199 if __name__ == '__main__':
200 unittest.main()
201 unittest.TextTestRunner(verbosity=3).run(suite())
Something went wrong with that request. Please try again.