Skip to content

Commit

Permalink
Added UT automation for file_reference module.
Browse files Browse the repository at this point in the history
  • Loading branch information
glennmatthews committed Aug 5, 2015
1 parent af81c8d commit 8256e85
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 29 deletions.
32 changes: 17 additions & 15 deletions COT/file_reference.py
Expand Up @@ -38,12 +38,18 @@ def __init__(self, file_path, filename=None):
else:
self.file_path = os.path.join(file_path, filename)
self.filename = filename
if not self.exists():
raise IOError("File {0} does not exist!".format(self.file_path))
self.obj = None

def exists(self):
"""Check whether the file exists on disk."""
return os.path.exists(self.file_path)

def size(self):
"""Get the size of this file, in bytes."""
return os.path.getsize(self.file_path)

def open(self, mode):
"""Open the file and return a reference to the file object."""
self.obj = open(self.file_path, mode)
Expand All @@ -60,12 +66,6 @@ def copy_to(self, dest_dir):
logger.info("Copying {0} to {1}".format(self.file_path, dest_dir))
shutil.copy(self.file_path, dest_dir)

def size(self):
"""Get the size of this file, in bytes."""
if self.exists():
return os.path.getsize(self.file_path)
return None

def add_to_archive(self, tarf):
"""Copy this file into the given tarfile object."""
logger.info("Adding {0} to TAR file as {1}"
Expand All @@ -79,9 +79,14 @@ class FileInTAR(object):

def __init__(self, tarfile_path, filename):
"""Create a reference to a file contained in a TAR archive."""
if not tarfile.is_tarfile(tarfile_path):
raise IOError("{0} is not a valid TAR file.".format(tarfile_path))
self.tarfile_path = tarfile_path
self.file_path = None
self.filename = filename
if not self.exists():
raise IOError("{0} does not exist in {1}"
.format(filename, tarfile_path))
self.file_path = None
self.tarf = None
self.obj = None

Expand All @@ -94,6 +99,11 @@ def exists(self):
except KeyError:
return False

def size(self):
"""Get the size of this file in bytes."""
with closing(tarfile.open(self.tarfile_path, 'r')) as tarf:
return tarf.getmember(self.filename).size

def open(self, mode):
"""Open the TAR and return a reference to the relevant file object."""
# We can only extract a file object from a TAR file in read mode.
Expand All @@ -119,14 +129,6 @@ def copy_to(self, dest_dir):
.format(self.filename, self.tarfile_path, dest_dir))
tarf.extract(self.filename, dest_dir)

def size(self):
"""Get the size of this file in bytes."""
with closing(tarfile.open(self.tarfile_path, 'r')) as tarf:
try:
return tarf.getmember(self.filename).size
except KeyError:
return None

def add_to_archive(self, tarf):
"""Copy this file into the given tarfile object."""
self.open('r')
Expand Down
31 changes: 17 additions & 14 deletions COT/ovf.py
Expand Up @@ -316,22 +316,22 @@ def __init__(self, input_file, output_file):
# make sure they look sane, and store file references for later.
file_list = [f.get(self.FILE_HREF) for f in
self.references.findall(self.FILE)]
self.invalid_files = []
if self.input_file == self.ovf_descriptor:
# Check files in the directory referenced by the OVF descriptor
input_dir = os.path.dirname(self.ovf_descriptor)
for f in file_list:
self._file_references[f] = FileOnDisk(input_dir, f)
input_path = os.path.dirname(self.ovf_descriptor)
ref_cls = FileOnDisk
else:
# OVA - check contents of TAR file.
for f in file_list:
self._file_references[f] = FileInTAR(self.input_file, f)
input_path = self.input_file
ref_cls = FileInTAR

for ref in self._file_references.values():
if not ref.exists():
for f in file_list:
try:
self._file_references[f] = ref_cls(input_path, f)
except IOError:
logger.error("File '{0}' referenced in the OVF descriptor "
"does not exist.".format(f))
self.invalid_files.append(f)
self._file_references[f] = None

except Exception as e:
self.destroy()
Expand Down Expand Up @@ -626,11 +626,14 @@ def validate_and_update_file_references(self):
href = file_elem.get(self.FILE_HREF)
file_ref = self._file_references[href]

if not file_ref.exists():
if href not in self.invalid_files:
# otherwise we already warned about it at read time.
logger.error("Referenced file '{0}' does not exist!"
.format(href))
if file_ref is not None and not file_ref.exists():
# file used to exist but no longer does??
logger.error("Referenced file '{0}' does not exist!"
.format(href))
self._file_references[href] = None
file_ref = None

if file_ref is None:
# TODO this should probably have a confirm() check...
logger.warning("Removing reference to missing file {0}"
.format(href))
Expand Down
Binary file added COT/tests/test.tar
Binary file not shown.
145 changes: 145 additions & 0 deletions COT/tests/test_file_reference.py
@@ -0,0 +1,145 @@
#!/usr/bin/env python
#
# test_file_reference.py - Unit test cases for COT file reference handling
#
# August 2015, Glenn F. Matthews
# Copyright (c) 2015 the COT project developers.
# See the COPYRIGHT.txt file at the top-level directory of this distribution
# and at https://github.com/glennmatthews/cot/blob/master/COPYRIGHT.txt.
#
# This file is part of the Common OVF Tool (COT) project.
# It is subject to the license terms in the LICENSE.txt file found in the
# top-level directory of this distribution and at
# https://github.com/glennmatthews/cot/blob/master/LICENSE.txt. No part
# of COT, including this file, may be copied, modified, propagated, or
# distributed except according to the terms contained in the LICENSE.txt file.

"""Unit test cases for COT.file_reference classes."""

import os
import tarfile

from contextlib import closing
from pkg_resources import resource_filename

from COT.tests.ut import COT_UT
from COT.file_reference import FileOnDisk, FileInTAR


class TestFileOnDisk(COT_UT):

"""Test cases for FileOnDisk class."""

def test_nonexistent_file(self):
"""Test error handling when the file doesn't exist."""
self.assertRaises(IOError, FileOnDisk, "/foo/bar.txt")
self.assertRaises(IOError, FileOnDisk, "/foo", "bar.txt")

def test_exists(self):
"""Test the exists() API."""
self.assertTrue(FileOnDisk(self.input_ovf).exists())
# false case is covered by test_nonexistent_file

def test_size(self):
"""Test the size() API."""
self.assertEqual(FileOnDisk(self.input_ovf).size(),
os.path.getsize(self.input_ovf))

def test_open_close(self):
"""Test the open() and close() APIs."""
ref = FileOnDisk(self.input_ovf)
ref_obj = ref.open('r')
try:
file_obj = open(self.input_ovf, 'r')
try:
self.assertEqual(file_obj.readline(), ref_obj.readline())
finally:
file_obj.close()
finally:
ref.close()

def test_copy_to(self):
"""Test the copy_to() API."""
FileOnDisk(self.input_ovf).copy_to(self.temp_dir)
self.check_diff("", file2=os.path.join(self.temp_dir, 'input.ovf'))

def test_add_to_archive(self):
"""Test the add_to_archive() API."""
output_tarfile = os.path.join(self.temp_dir, 'test_output.tar')
with closing(tarfile.open(output_tarfile, 'w')) as tarf:
FileOnDisk(self.input_ovf).add_to_archive(tarf)
with closing(tarfile.open(output_tarfile, 'r')) as tarf:
tarf.extract('input.ovf', self.temp_dir)
self.check_diff("", file2=os.path.join(self.temp_dir, 'input.ovf'))


class TestFileInTAR(COT_UT):

"""Test cases for FileInTAR class."""

def setUp(self):
"""Test case setup function called automatically prior to each test."""
super(TestFileInTAR, self).setUp()
self.tarfile = resource_filename(__name__, "test.tar")
self.valid_ref = FileInTAR(self.tarfile, "sample_cfg.txt")

def test_nonexistent_tarfile(self):
"""Test error handling when TAR file doesn't exist."""
self.assertRaises(IOError, FileInTAR, "/foo/bar", "filename")

def test_nonexistent_entry(self):
"""Test error handling when filename isn't in the TAR."""
self.assertRaises(IOError, FileInTAR, self.tarfile, "foo.bar")

def test_not_tarfile(self):
"""Test error handling when file is not a TAR file."""
self.assertRaises(IOError, FileInTAR, self.input_ovf, self.input_ovf)

def test_exists(self):
"""Test the exists() API."""
self.assertTrue(self.valid_ref.exists())
# false case is covered in test_nonexistent_entry

def test_size(self):
"""Test the size() API."""
self.assertEqual(self.valid_ref.size(),
os.path.getsize(resource_filename(__name__,
'sample_cfg.txt')))

def test_open_close(self):
"""Test the open() and close() APIs."""
# open() only supports r/rb mode
self.assertRaises(ValueError, self.valid_ref.open, 'w')
self.assertRaises(ValueError, self.valid_ref.open, 'a')

# No-op:
self.valid_ref.close()

obj = self.valid_ref.open('r')
# Check that file contents are as expected
self.assertEqual(obj.readline(), b'!\n')
self.assertEqual(obj.readline(), b'interface GigabitEthernet0/0/0/0\n')
# TODO: this should clean up nicely
obj.close()
self.valid_ref.close()
self.assertRaises(ValueError, obj.read)
# No-op:
self.valid_ref.close()

def test_copy_to(self):
"""Test the copy_to() API."""
self.valid_ref.copy_to(self.temp_dir)
self.check_diff("",
file1=resource_filename(__name__, 'sample_cfg.txt'),
file2=os.path.join(self.temp_dir, 'sample_cfg.txt'))

def test_add_to_archive(self):
"""Test the add_to_archive() API."""
output_tarfile = os.path.join(self.temp_dir, 'test_output.tar')
with closing(tarfile.open(output_tarfile, 'w')) as tarf:
self.valid_ref.add_to_archive(tarf)
with closing(tarfile.open(output_tarfile, 'r')) as tarf:
tarf.extract('sample_cfg.txt', self.temp_dir)
self.check_diff("",
file1=resource_filename(__name__, 'sample_cfg.txt'),
file2=os.path.join(self.temp_dir, 'sample_cfg.txt'))

0 comments on commit 8256e85

Please sign in to comment.