From 8256e850e6c29751d59fa6819b20974042b68b95 Mon Sep 17 00:00:00 2001 From: Glenn Matthews Date: Wed, 5 Aug 2015 11:29:08 -0400 Subject: [PATCH] Added UT automation for file_reference module. --- COT/file_reference.py | 32 +++---- COT/ovf.py | 31 ++++--- COT/tests/test.tar | Bin 0 -> 3072 bytes COT/tests/test_file_reference.py | 145 +++++++++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 29 deletions(-) create mode 100644 COT/tests/test.tar create mode 100644 COT/tests/test_file_reference.py diff --git a/COT/file_reference.py b/COT/file_reference.py index b26e06f..de5cd23 100644 --- a/COT/file_reference.py +++ b/COT/file_reference.py @@ -38,12 +38,18 @@ def __init__(self, file_path, filename=None): else: self.file_path = os.path.join(file_path, filename) self.filename = filename + if not self.exists(): + raise IOError("File {0} does not exist!".format(self.file_path)) self.obj = None def exists(self): """Check whether the file exists on disk.""" return os.path.exists(self.file_path) + def size(self): + """Get the size of this file, in bytes.""" + return os.path.getsize(self.file_path) + def open(self, mode): """Open the file and return a reference to the file object.""" self.obj = open(self.file_path, mode) @@ -60,12 +66,6 @@ def copy_to(self, dest_dir): logger.info("Copying {0} to {1}".format(self.file_path, dest_dir)) shutil.copy(self.file_path, dest_dir) - def size(self): - """Get the size of this file, in bytes.""" - if self.exists(): - return os.path.getsize(self.file_path) - return None - def add_to_archive(self, tarf): """Copy this file into the given tarfile object.""" logger.info("Adding {0} to TAR file as {1}" @@ -79,9 +79,14 @@ class FileInTAR(object): def __init__(self, tarfile_path, filename): """Create a reference to a file contained in a TAR archive.""" + if not tarfile.is_tarfile(tarfile_path): + raise IOError("{0} is not a valid TAR file.".format(tarfile_path)) self.tarfile_path = tarfile_path - self.file_path = None self.filename = filename + if not self.exists(): + raise IOError("{0} does not exist in {1}" + .format(filename, tarfile_path)) + self.file_path = None self.tarf = None self.obj = None @@ -94,6 +99,11 @@ def exists(self): except KeyError: return False + def size(self): + """Get the size of this file in bytes.""" + with closing(tarfile.open(self.tarfile_path, 'r')) as tarf: + return tarf.getmember(self.filename).size + def open(self, mode): """Open the TAR and return a reference to the relevant file object.""" # We can only extract a file object from a TAR file in read mode. @@ -119,14 +129,6 @@ def copy_to(self, dest_dir): .format(self.filename, self.tarfile_path, dest_dir)) tarf.extract(self.filename, dest_dir) - def size(self): - """Get the size of this file in bytes.""" - with closing(tarfile.open(self.tarfile_path, 'r')) as tarf: - try: - return tarf.getmember(self.filename).size - except KeyError: - return None - def add_to_archive(self, tarf): """Copy this file into the given tarfile object.""" self.open('r') diff --git a/COT/ovf.py b/COT/ovf.py index 410883c..5593228 100644 --- a/COT/ovf.py +++ b/COT/ovf.py @@ -316,22 +316,22 @@ def __init__(self, input_file, output_file): # make sure they look sane, and store file references for later. file_list = [f.get(self.FILE_HREF) for f in self.references.findall(self.FILE)] - self.invalid_files = [] if self.input_file == self.ovf_descriptor: # Check files in the directory referenced by the OVF descriptor - input_dir = os.path.dirname(self.ovf_descriptor) - for f in file_list: - self._file_references[f] = FileOnDisk(input_dir, f) + input_path = os.path.dirname(self.ovf_descriptor) + ref_cls = FileOnDisk else: # OVA - check contents of TAR file. - for f in file_list: - self._file_references[f] = FileInTAR(self.input_file, f) + input_path = self.input_file + ref_cls = FileInTAR - for ref in self._file_references.values(): - if not ref.exists(): + for f in file_list: + try: + self._file_references[f] = ref_cls(input_path, f) + except IOError: logger.error("File '{0}' referenced in the OVF descriptor " "does not exist.".format(f)) - self.invalid_files.append(f) + self._file_references[f] = None except Exception as e: self.destroy() @@ -626,11 +626,14 @@ def validate_and_update_file_references(self): href = file_elem.get(self.FILE_HREF) file_ref = self._file_references[href] - if not file_ref.exists(): - if href not in self.invalid_files: - # otherwise we already warned about it at read time. - logger.error("Referenced file '{0}' does not exist!" - .format(href)) + if file_ref is not None and not file_ref.exists(): + # file used to exist but no longer does?? + logger.error("Referenced file '{0}' does not exist!" + .format(href)) + self._file_references[href] = None + file_ref = None + + if file_ref is None: # TODO this should probably have a confirm() check... logger.warning("Removing reference to missing file {0}" .format(href)) diff --git a/COT/tests/test.tar b/COT/tests/test.tar new file mode 100644 index 0000000000000000000000000000000000000000..ffb5021cf4536cb36ebfd709a4b100cf1420a332 GIT binary patch literal 3072 zcmeHHJ#WJx5Y6mgLAG{D4-5w$I&`Sop<91Ib$lgOVmpdS+Mi$INKw)uo25oF496Y3 z)7^td-EQ}3(Ztz^0{~D;Hp^C{8Z3yDpF;pbX~u<8N=hY4WWY;R!i>R)HJwxM`$W53 z+H%uSN~>_*__N5z;w_ejPTu!dG=80-0;MA&!I69=Of({Dp{)h?5Wvq!k3H7MfyqO)SabqQ~#V}*8frSTyi|> zUn1h9|8Z{PI_tcs+Z1*&3yrnC|$lRJ460}}%i1OJ$TUmI|J;s5{u literal 0 HcmV?d00001 diff --git a/COT/tests/test_file_reference.py b/COT/tests/test_file_reference.py new file mode 100644 index 0000000..6867dd3 --- /dev/null +++ b/COT/tests/test_file_reference.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# +# test_file_reference.py - Unit test cases for COT file reference handling +# +# August 2015, Glenn F. Matthews +# Copyright (c) 2015 the COT project developers. +# See the COPYRIGHT.txt file at the top-level directory of this distribution +# and at https://github.com/glennmatthews/cot/blob/master/COPYRIGHT.txt. +# +# This file is part of the Common OVF Tool (COT) project. +# It is subject to the license terms in the LICENSE.txt file found in the +# top-level directory of this distribution and at +# https://github.com/glennmatthews/cot/blob/master/LICENSE.txt. No part +# of COT, including this file, may be copied, modified, propagated, or +# distributed except according to the terms contained in the LICENSE.txt file. + +"""Unit test cases for COT.file_reference classes.""" + +import os +import tarfile + +from contextlib import closing +from pkg_resources import resource_filename + +from COT.tests.ut import COT_UT +from COT.file_reference import FileOnDisk, FileInTAR + + +class TestFileOnDisk(COT_UT): + + """Test cases for FileOnDisk class.""" + + def test_nonexistent_file(self): + """Test error handling when the file doesn't exist.""" + self.assertRaises(IOError, FileOnDisk, "/foo/bar.txt") + self.assertRaises(IOError, FileOnDisk, "/foo", "bar.txt") + + def test_exists(self): + """Test the exists() API.""" + self.assertTrue(FileOnDisk(self.input_ovf).exists()) + # false case is covered by test_nonexistent_file + + def test_size(self): + """Test the size() API.""" + self.assertEqual(FileOnDisk(self.input_ovf).size(), + os.path.getsize(self.input_ovf)) + + def test_open_close(self): + """Test the open() and close() APIs.""" + ref = FileOnDisk(self.input_ovf) + ref_obj = ref.open('r') + try: + file_obj = open(self.input_ovf, 'r') + try: + self.assertEqual(file_obj.readline(), ref_obj.readline()) + finally: + file_obj.close() + finally: + ref.close() + + def test_copy_to(self): + """Test the copy_to() API.""" + FileOnDisk(self.input_ovf).copy_to(self.temp_dir) + self.check_diff("", file2=os.path.join(self.temp_dir, 'input.ovf')) + + def test_add_to_archive(self): + """Test the add_to_archive() API.""" + output_tarfile = os.path.join(self.temp_dir, 'test_output.tar') + with closing(tarfile.open(output_tarfile, 'w')) as tarf: + FileOnDisk(self.input_ovf).add_to_archive(tarf) + with closing(tarfile.open(output_tarfile, 'r')) as tarf: + tarf.extract('input.ovf', self.temp_dir) + self.check_diff("", file2=os.path.join(self.temp_dir, 'input.ovf')) + + +class TestFileInTAR(COT_UT): + + """Test cases for FileInTAR class.""" + + def setUp(self): + """Test case setup function called automatically prior to each test.""" + super(TestFileInTAR, self).setUp() + self.tarfile = resource_filename(__name__, "test.tar") + self.valid_ref = FileInTAR(self.tarfile, "sample_cfg.txt") + + def test_nonexistent_tarfile(self): + """Test error handling when TAR file doesn't exist.""" + self.assertRaises(IOError, FileInTAR, "/foo/bar", "filename") + + def test_nonexistent_entry(self): + """Test error handling when filename isn't in the TAR.""" + self.assertRaises(IOError, FileInTAR, self.tarfile, "foo.bar") + + def test_not_tarfile(self): + """Test error handling when file is not a TAR file.""" + self.assertRaises(IOError, FileInTAR, self.input_ovf, self.input_ovf) + + def test_exists(self): + """Test the exists() API.""" + self.assertTrue(self.valid_ref.exists()) + # false case is covered in test_nonexistent_entry + + def test_size(self): + """Test the size() API.""" + self.assertEqual(self.valid_ref.size(), + os.path.getsize(resource_filename(__name__, + 'sample_cfg.txt'))) + + def test_open_close(self): + """Test the open() and close() APIs.""" + # open() only supports r/rb mode + self.assertRaises(ValueError, self.valid_ref.open, 'w') + self.assertRaises(ValueError, self.valid_ref.open, 'a') + + # No-op: + self.valid_ref.close() + + obj = self.valid_ref.open('r') + # Check that file contents are as expected + self.assertEqual(obj.readline(), b'!\n') + self.assertEqual(obj.readline(), b'interface GigabitEthernet0/0/0/0\n') + # TODO: this should clean up nicely + obj.close() + self.valid_ref.close() + self.assertRaises(ValueError, obj.read) + # No-op: + self.valid_ref.close() + + def test_copy_to(self): + """Test the copy_to() API.""" + self.valid_ref.copy_to(self.temp_dir) + self.check_diff("", + file1=resource_filename(__name__, 'sample_cfg.txt'), + file2=os.path.join(self.temp_dir, 'sample_cfg.txt')) + + def test_add_to_archive(self): + """Test the add_to_archive() API.""" + output_tarfile = os.path.join(self.temp_dir, 'test_output.tar') + with closing(tarfile.open(output_tarfile, 'w')) as tarf: + self.valid_ref.add_to_archive(tarf) + with closing(tarfile.open(output_tarfile, 'r')) as tarf: + tarf.extract('sample_cfg.txt', self.temp_dir) + self.check_diff("", + file1=resource_filename(__name__, 'sample_cfg.txt'), + file2=os.path.join(self.temp_dir, 'sample_cfg.txt'))