Skip to content

Commit

Permalink
Merge pull request #330 from wjjmjh/196_io_writers_check_destination
Browse files Browse the repository at this point in the history
#196, io writers should check destination more thoroughly before deleting
  • Loading branch information
GavinHuttley committed Nov 12, 2019
2 parents f757046 + 429a5b6 commit 28e17b5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
25 changes: 25 additions & 0 deletions src/cogent3/app/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,11 +566,24 @@ def __init__(
self._persistent = {k: v for k, v in d.items() if k != "self"}
self.mode = mode

def _has_other_suffixes(self, path, suffix):
p = Path(path)
for f in p.iterdir():
if get_format_suffixes(str(f))[0] != suffix:
return True
return False

def _source_create_delete(self, if_exists, create):
exists = os.path.exists(self.source)
if exists and if_exists == RAISE:
raise RuntimeError(f"'{self.source}' exists")
elif exists and if_exists == OVERWRITE:
if self._has_other_suffixes(self.source, self.suffix):
raise RuntimeError(
f"Unsafe to delete {self.source} as it contains ",
f"files other than {self.suffix}."
" You will need to remove this directly yourself.",
)
try:
shutil.rmtree(self.source)
except NotADirectoryError:
Expand Down Expand Up @@ -635,12 +648,24 @@ def __init__(
self._persistent = {k: v for k, v in d.items() if k != "self"}
self.mode = "a" or mode

def _has_other_suffixes(self, path, suffix):
for f in zipfile.ZipFile(path).namelist():
if get_format_suffixes(f)[0] != suffix:
return True
return False

def _source_create_delete(self, if_exists, create):
exists = os.path.exists(self.source)
dirname = os.path.dirname(self.source)
if exists and if_exists == RAISE:
raise RuntimeError(f"'{self.source}' exists")
elif exists and if_exists == OVERWRITE:
if self._has_other_suffixes(self.source, self.suffix):
raise RuntimeError(
f"Unsafe to delete {self.source} as it contains ",
f"files other than {self.suffix}."
" You will need to remove this directly yourself.",
)
os.remove(self.source)
elif dirname and not os.path.exists(dirname) and not create:
raise RuntimeError(f"'{dirname}' does not exist")
Expand Down
57 changes: 56 additions & 1 deletion tests/test_app/test_data_store.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os
import shutil
import sys
import zipfile

from tempfile import TemporaryDirectory
from unittest import TestCase, main, skipIf

from cogent3.app.data_store import (
OVERWRITE,
DataStoreMember,
ReadOnlyDirectoryDataStore,
ReadOnlyTinyDbDataStore,
Expand All @@ -14,7 +16,6 @@
WritableDirectoryDataStore,
WritableTinyDbDataStore,
WritableZippedDataStore,
make_record_for_json,
)
from cogent3.parse.fasta import MinimalFastaParser

Expand Down Expand Up @@ -276,6 +277,35 @@ class DirectoryDataStoreTests(TestCase, DataStoreBaseTests):
ReadClass = ReadOnlyDirectoryDataStore
WriteClass = WritableDirectoryDataStore

def test_write_class_source_create_delete(self):
with TemporaryDirectory(dir=".") as dirname:
# tests the case when the directory has the file with the same suffix to self.suffix
path = os.path.join(dirname, "delme_dir")
os.mkdir(path)
with open(
os.path.join(path, "test_write_class_source_create_delete.json"), "w"
):
pass
dstore = self.WriteClass(
path, suffix=".json", if_exists=OVERWRITE, create=True
)
self.assertEqual(len(dstore), 0)
# tests the case when the directory has the file with the different suffix to self.suffix
with open(
os.path.join(path, "test_write_class_source_create_delete.dummySuffix"),
"w",
):
pass
with self.assertRaises(RuntimeError):
dstore = self.WriteClass(
path, suffix=".json", if_exists=OVERWRITE, create=True
)
# tests the case when the directory has the file with the same suffix to self.suffix
dstore = self.WriteClass(
path, suffix=".dummySuffix", if_exists=OVERWRITE, create=True
)
self.assertEqual(len(dstore), 0)


class ZippedDataStoreTests(TestCase, DataStoreBaseTests):
basedir = "data.zip"
Expand All @@ -302,6 +332,31 @@ def test_store_suffix(self):
self.assertEqual(dstore.source, self.basedir)
self.assertTrue(len(dstore) > 1)

def test_write_class_source_create_delete(self):
with TemporaryDirectory(dir=".") as dirname:
path = os.path.join(dirname, "delme_dir")
os.mkdir(path)
with zipfile.ZipFile(os.path.join(path, self.basedir), "w") as myzip:
with open("dummyPrefix_.dummySuffix", "w"):
pass
myzip.write("dummyPrefix_.dummySuffix")
# tests the case when the ZippedDataStore has other different suffixes to self.suffix
with self.assertRaises(RuntimeError):
dstore = self.WriteClass(
os.path.join(path, self.basedir),
suffix=".json",
if_exists=OVERWRITE,
create=True,
)
# tests the case when the ZippedDataStore only contains files with the same suffix as self.suffix
with zipfile.ZipFile("delme.zip", "w") as myzip:
with open("dummyPrefix_.json", "w"):
pass
myzip.write("dummyPrefix_.json")
dstore = self.WriteClass(
"delme.zip", suffix=".json", if_exists=OVERWRITE, create=True
)


class TinyDBDataStoreTests(TestCase):
basedir = "data"
Expand Down

0 comments on commit 28e17b5

Please sign in to comment.