Skip to content

Commit

Permalink
Avoid leaking files in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
amol- committed Feb 21, 2024
1 parent 8e3113e commit b1b59e6
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 75 deletions.
20 changes: 13 additions & 7 deletions tests/test_fields_basic.py
Expand Up @@ -5,13 +5,16 @@
from sqlalchemy.schema import Column
from sqlalchemy.types import Unicode, Integer
from .base_sqla import setup_database, clear_database, DeclarativeBase, DBSession
from .utils import OpenFiles
from depot.fields.sqlalchemy import UploadedFileField
from depot.fields.upload import UploadedFile
from depot.manager import DepotManager
from depot.fields.interfaces import FileFilter

from depot._compat import u_, bytes_

of = OpenFiles()

def setUpModule():
setup_database()

Expand Down Expand Up @@ -62,9 +65,12 @@ def tearDownClass(cls):
def setUp(self):
clear_database()

def tearDown(self):
of.close_all()

def test_column_is_dictlike(self):
doc = SimpleDocument(name=u_('Foo'))
doc.content = open(self.fake_file.name, 'rb')
doc.content = of.open(self.fake_file.name, 'rb')
DBSession.add(doc)
DBSession.flush()
DBSession.commit()
Expand All @@ -78,7 +84,7 @@ def test_column_is_dictlike(self):

def test_column_cannot_edit_after_save(self):
doc = SimpleDocument(name=u_('Foo'))
doc.content = open(self.fake_file.name, 'rb')
doc.content = of.open(self.fake_file.name, 'rb')
DBSession.add(doc)
DBSession.flush()
DBSession.commit()
Expand All @@ -89,7 +95,7 @@ def test_column_cannot_edit_after_save(self):

def test_column_cannot_delete_after_save(self):
doc = SimpleDocument(name=u_('Foo'))
doc.content = open(self.fake_file.name, 'rb')
doc.content = of.open(self.fake_file.name, 'rb')
DBSession.add(doc)
DBSession.flush()
DBSession.commit()
Expand All @@ -100,7 +106,7 @@ def test_column_cannot_delete_after_save(self):

def test_column_cannot_edit_attr_after_save(self):
doc = SimpleDocument(name=u_('Foo'))
doc.content = open(self.fake_file.name, 'rb')
doc.content = of.open(self.fake_file.name, 'rb')
DBSession.add(doc)
DBSession.flush()
DBSession.commit()
Expand All @@ -111,7 +117,7 @@ def test_column_cannot_edit_attr_after_save(self):

def test_column_cannot_delete_attr_after_save(self):
doc = SimpleDocument(name=u_('Foo'))
doc.content = open(self.fake_file.name, 'rb')
doc.content = of.open(self.fake_file.name, 'rb')
DBSession.add(doc)
DBSession.flush()
DBSession.commit()
Expand All @@ -123,8 +129,8 @@ def test_column_cannot_delete_attr_after_save(self):
def test_storage_does_not_exists(self):
doc = SimpleDocument(name=u_('Foo'))
with self.assertRaises(ValueError):
doc.content = UploadedFile(open(self.fake_file.name, 'rb'),
'missing_storage')
doc.content = UploadedFile(of.open(self.fake_file.name, 'rb'),
'missing_storage')
DBSession.add(doc)
DBSession.flush()
DBSession.commit()
79 changes: 45 additions & 34 deletions tests/test_fields_ming.py
Expand Up @@ -10,10 +10,12 @@
from ming.odm.declarative import MappedClass
from ming.odm import FieldProperty
from ming import schema as s, Field
from .utils import create_cgifs
from .utils import create_cgifs, OpenFiles
from depot.fields.specialized.image import UploadedImageWithThumb
from depot._compat import u_

of = OpenFiles()


def setUpModule():
setup_database()
Expand Down Expand Up @@ -45,22 +47,22 @@ class __mongometa__:
class TestMingAttachments(unittest.TestCase):
def setUp(self):
self.file_content = b'this is the file content'
self.fake_file = tempfile.NamedTemporaryFile()
self.fake_file = of.add(tempfile.NamedTemporaryFile())
self.fake_file.write(self.file_content)
self.fake_file.flush()
self.fake_file.seek(0)
clear_database()

def tearDown(self):
self.fake_file.close()
of.close_all()

def test_accessing_class_property(self):
# This is to check for regression in a bug in property descriptor
prop = Document.content
assert isinstance(prop, FieldProperty), prop

def test_create_fromfile(self):
doc = Document(name='Foo', content = open(self.fake_file.name, 'rb'))
doc = Document(name='Foo', content = of.open(self.fake_file.name, 'rb'))
DBSession.flush()
DBSession.clear()

Expand All @@ -69,7 +71,7 @@ def test_create_fromfile(self):
assert d.content.file.filename == os.path.basename(self.fake_file.name)

def test_create_fromfile_flush_single_document(self):
doc = Document(name='Foo', content = open(self.fake_file.name, 'rb'))
doc = Document(name='Foo', content = of.open(self.fake_file.name, 'rb'))
DBSession.flush(doc)
DBSession.clear()

Expand All @@ -79,8 +81,9 @@ def test_create_fromfile_flush_single_document(self):

def test_edit_existing(self):
doc = Document(name=u_('Foo2'))
doc.content = open(self.fake_file.name, 'rb')
DBSession.flush()
with open(self.fake_file.name, 'rb') as f:
doc.content = f
DBSession.flush()
DBSession.clear()

d = Document.query.find(dict(name='Foo2')).first()
Expand All @@ -102,8 +105,9 @@ def test_edit_existing(self):

def test_edit_existing_rollback(self):
doc = Document(name=u_('Foo3'))
doc.content = open(self.fake_file.name, 'rb')
DBSession.flush()
with open(self.fake_file.name, 'rb') as f:
doc.content = f
DBSession.flush()
DBSession.clear()


Expand All @@ -125,6 +129,7 @@ def test_edit_existing_rollback(self):

def test_create_fromfield(self):
field = create_cgifs('image/jpeg', self.fake_file, 'test.jpg')
of.add(field.file)

doc = Document(name=u_('Foo'), content=field)
DBSession.flush()
Expand All @@ -146,8 +151,9 @@ def test_create_empty(self):

def test_delete_existing(self):
doc = Document(name=u_('Foo2'))
doc.content = open(self.fake_file.name, 'rb')
DBSession.flush()
with open(self.fake_file.name, 'rb') as f:
doc.content = f
DBSession.flush()
DBSession.clear()


Expand All @@ -163,8 +169,9 @@ def test_delete_existing(self):

def test_delete_existing_rollback(self):
doc = Document(name=u_('Foo3'))
doc.content = open(self.fake_file.name, 'rb')
DBSession.flush()
with open(self.fake_file.name, 'rb') as f:
doc.content = f
DBSession.flush()
DBSession.clear()


Expand All @@ -176,8 +183,9 @@ def test_delete_existing_rollback(self):
assert get_file(old_file).read() == self.file_content

def test_create_with_alias(self):
doc = Document(name='Foo', targeted_content=open(self.fake_file.name, 'rb'))
DBSession.flush()
with open(self.fake_file.name, 'rb') as f:
doc = Document(name='Foo', targeted_content=f)
DBSession.flush()
DBSession.clear()

d = Document.query.find(dict(name='Foo')).first()
Expand All @@ -190,19 +198,20 @@ class TestMingImageAttachments(unittest.TestCase):
def setUp(self):
self.file_content = b'''R0lGODlhEQAUAPcAAC4uLjAwMDIyMjMzMjQ0NDU1NDY2Njk2Mzg4ODo6Oj49Ozw8PD4+PkE+OEA/PkhAN0tCNk5JPFFGNV1KMFhNNVFHOFJJPVVLPVhOPXZfKXVcK2ZQNGZXNGtZMnNcNHZeNnldMHJfOn1hKXVjOH9oO0BAQEJCQkREREVFREZGRklGQ05KQ0hISEpKSkxMTE5OTlZRSlFQT19XSFBQUFJSUlRUVGFUQmFVQ2ZZQGtdQnNiQqJ/HI1uIYBnLIllKoZrK4FqLoVqL4luLIpsLpt7J515JJ50KZhzLYFnMIFlM4ZlMIFkNI1uNoJoOoVrPIlvO49yMolwPpB2O5p4Op98PaB3IKN4JqN8J6h7I6J5LaZ+LLF+ILGGG7+JG72OGLKEI7aHIrOEJL2JI7mMN76YNcGJG8SOG8WLHMONH86eEs+aFsGSG8eQHMySG9uVFduXFdeeE9eaFdScF96YE9yaFOKcEuOdEtWgFNiiEduhE96pEuqlD+qmD+KpD+yoDu6rDuysDvCuDfCvDeuwD/SzC/a2CvGwDfKxDPi5Cfi5Cvq8CeCjEuehEOagEeijEOKoEOStFMOOK8+TLM6YNNChItGgLtylKt6gMNqgON6jPOChLfi/JOSrNeGvN9KhRtykRNWkSOCnQOCpSOawQue1T+a6Su67SOGsUO/AVAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACH5BAAAAAAALAAAAAARABQAAAj+AAEIHEiwoMAACBMqXIhQgMOHDwdAfEigYsUCT6KQScNjxwGLBAyINPBhCilUmxIV6uMlw0gEMJeMGWWqFCU8hA4JEgETQYIEDcRw6qRlwgMIGvJwkfAzwYIFXQBB8qHg6VMKFawuYNBBjaIqDhhI+cGgrNmyJXooGrShBJBKcIpwiFCibl0TehDdsWBCiBxLZuIwolMGiwcTJ4gUenThBAokSVRgGFJnzhYQJ1KsyRkkhWfPK87McWPEM4sRhgItCsGitQ5PmtxYUdK6BY4rf/zwYRNmUihRpyQdaUHchQsoX/Y4amTnUqZPoG7YMO7ihfUcYNC0eRMJExMqMKweW59BfkYMEk2ykHAio3x5GvDjy58Pv4b9+/jz2w8IADs='''
self.file_content = base64.b64decode(self.file_content)
self.fake_file = tempfile.NamedTemporaryFile()
self.fake_file = of.add(tempfile.NamedTemporaryFile())
self.fake_file.write(self.file_content)
self.fake_file.flush()
self.fake_file.seek(0)
clear_database()

def tearDown(self):
self.fake_file.close()
of.close_all()

def test_create_fromfile(self):
doc = Document(name=u_('Foo'))
doc.photo = open(self.fake_file.name, 'rb')
DBSession.flush()
with open(self.fake_file.name, 'rb') as f:
doc.photo = f
DBSession.flush()
DBSession.clear()

d = Document.query.find(dict(name='Foo')).first()
Expand All @@ -212,10 +221,10 @@ def test_create_fromfile(self):
def test_create_fromfield(self):
field = cgi.FieldStorage()
field.filename = u_('àèìòù.gif')
field.file = open(self.fake_file.name, 'rb')

doc = Document(name=u_('Foo'), photo=field)
DBSession.flush()
with open(self.fake_file.name, 'rb') as f:
field.file = f
doc = Document(name=u_('Foo'), photo=field)
DBSession.flush()
DBSession.clear()

d = Document.query.find(dict(name='Foo')).first()
Expand All @@ -227,6 +236,7 @@ def test_create_fromfield(self):

def test_thumbnail(self):
field = create_cgifs('image/gif', self.fake_file, 'test.gif')
of.add(field.file)

doc = Document(name=u_('Foo'), photo=field)
DBSession.flush()
Expand All @@ -241,8 +251,8 @@ def test_thumbnail(self):

def test_public_url(self):
doc = Document(name=u_('Foo'))
doc.photo = open(self.fake_file.name, 'rb')
doc.content = open(self.fake_file.name, 'rb')
doc.photo = of.open(self.fake_file.name, 'rb')
doc.content = of.open(self.fake_file.name, 'rb')
DBSession.flush()
DBSession.clear()

Expand All @@ -261,7 +271,7 @@ def test_rollback(self):
self.skipTest("Currently Ming Doesn't provide a way to handle discarded documents")

doc = Document(name=u_('Foo3'))
doc.photo = open(self.fake_file.name, 'rb')
doc.photo = of.open(self.fake_file.name, 'rb')

uploaded_file = doc.photo.path
uploaded_thumb = doc.photo.thumb_path
Expand All @@ -285,18 +295,18 @@ class TestMingThumbnailFilter(unittest.TestCase):
def setUp(self):
self.file_content = b'''R0lGODlhEQAUAPcAAC4uLjAwMDIyMjMzMjQ0NDU1NDY2Njk2Mzg4ODo6Oj49Ozw8PD4+PkE+OEA/PkhAN0tCNk5JPFFGNV1KMFhNNVFHOFJJPVVLPVhOPXZfKXVcK2ZQNGZXNGtZMnNcNHZeNnldMHJfOn1hKXVjOH9oO0BAQEJCQkREREVFREZGRklGQ05KQ0hISEpKSkxMTE5OTlZRSlFQT19XSFBQUFJSUlRUVGFUQmFVQ2ZZQGtdQnNiQqJ/HI1uIYBnLIllKoZrK4FqLoVqL4luLIpsLpt7J515JJ50KZhzLYFnMIFlM4ZlMIFkNI1uNoJoOoVrPIlvO49yMolwPpB2O5p4Op98PaB3IKN4JqN8J6h7I6J5LaZ+LLF+ILGGG7+JG72OGLKEI7aHIrOEJL2JI7mMN76YNcGJG8SOG8WLHMONH86eEs+aFsGSG8eQHMySG9uVFduXFdeeE9eaFdScF96YE9yaFOKcEuOdEtWgFNiiEduhE96pEuqlD+qmD+KpD+yoDu6rDuysDvCuDfCvDeuwD/SzC/a2CvGwDfKxDPi5Cfi5Cvq8CeCjEuehEOagEeijEOKoEOStFMOOK8+TLM6YNNChItGgLtylKt6gMNqgON6jPOChLfi/JOSrNeGvN9KhRtykRNWkSOCnQOCpSOawQue1T+a6Su67SOGsUO/AVAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACH5BAAAAAAALAAAAAARABQAAAj+AAEIHEiwoMAACBMqXIhQgMOHDwdAfEigYsUCT6KQScNjxwGLBAyINPBhCilUmxIV6uMlw0gEMJeMGWWqFCU8hA4JEgETQYIEDcRw6qRlwgMIGvJwkfAzwYIFXQBB8qHg6VMKFawuYNBBjaIqDhhI+cGgrNmyJXooGrShBJBKcIpwiFCibl0TehDdsWBCiBxLZuIwolMGiwcTJ4gUenThBAokSVRgGFJnzhYQJ1KsyRkkhWfPK87McWPEM4sRhgItCsGitQ5PmtxYUdK6BY4rf/zwYRNmUihRpyQdaUHchQsoX/Y4amTnUqZPoG7YMO7ihfUcYNC0eRMJExMqMKweW59BfkYMEk2ykHAio3x5GvDjy58Pv4b9+/jz2w8IADs='''
self.file_content = base64.b64decode(self.file_content)
self.fake_file = tempfile.NamedTemporaryFile()
self.fake_file = of.add(tempfile.NamedTemporaryFile())
self.fake_file.write(self.file_content)
self.fake_file.flush()
self.fake_file.seek(0)
clear_database()

def tearDown(self):
self.fake_file.close()
of.close_all()

def test_create_fromfile(self):
doc = Document(name=u_('Foo'))
doc.second_photo = open(self.fake_file.name, 'rb')
doc.second_photo = of.open(self.fake_file.name, 'rb')
DBSession.flush()
DBSession.clear()

Expand All @@ -307,7 +317,7 @@ def test_create_fromfile(self):
def test_create_fromfield(self):
field = cgi.FieldStorage()
field.filename = u_('àèìòù.gif')
field.file = open(self.fake_file.name, 'rb')
field.file = of.open(self.fake_file.name, 'rb')

doc = Document(name=u_('Foo'), second_photo=field)
DBSession.flush()
Expand All @@ -322,6 +332,7 @@ def test_create_fromfield(self):

def test_thumbnail(self):
field = create_cgifs('image/gif', self.fake_file, 'test.gif')
of.add(field.file)

doc = Document(name=u_('Foo'), second_photo=field)
DBSession.flush()
Expand All @@ -339,8 +350,8 @@ def test_thumbnail(self):

def test_public_url(self):
doc = Document(name=u_('Foo'))
doc.second_photo = open(self.fake_file.name, 'rb')
doc.content = open(self.fake_file.name, 'rb')
doc.second_photo = of.open(self.fake_file.name, 'rb')
doc.content = of.open(self.fake_file.name, 'rb')
DBSession.flush()
DBSession.clear()

Expand All @@ -358,7 +369,7 @@ def test_rollback(self):
self.skipTest("Currently Ming Doesn't provide a way to handle discarded documents")

doc = Document(name=u_('Foo3'))
doc.second_photo = open(self.fake_file.name, 'rb')
doc.second_photo = of.open(self.fake_file.name, 'rb')

uploaded_file = doc.second_photo.path
uploaded_thumb = doc.second_photo.thumb_12x12_path
Expand All @@ -375,4 +386,4 @@ def test_rollback(self):
fold = get_file(uploaded_thumb)
assert False, 'Should have raised IOError here'
except IOError:
pass
pass

0 comments on commit b1b59e6

Please sign in to comment.