Skip to content

Commit

Permalink
Merge pull request #156 from kain88-de/cleanup
Browse files Browse the repository at this point in the history
Cleanup state handling -- clarify docs
  • Loading branch information
dotsdl committed Dec 6, 2017
2 parents cff589a + 3c7cfec commit 8d1dc4a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 89 deletions.
66 changes: 12 additions & 54 deletions src/datreant/core/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
FileNotFoundError = IOError


class File(object):
class BaseFile(object):
"""Generic File object base class. Implements file locking and reloading
methods.
Expand All @@ -26,13 +26,16 @@ class File(object):
respectively. It handles any other low-level tasks for maintaining file
integrity.
Any child class will have to implement `_open_file_r` and `_open_file_w`.
These function should return a file like object that has a `close` method.
:Arguments:
*filename*
name of file on disk object corresponds to
"""

def __init__(self, filename, **kwargs):
def __init__(self, filename):
self.filename = os.path.abspath(filename)
self.handle = None
self.fd = None
Expand Down Expand Up @@ -76,15 +79,9 @@ def _shlock(self, fd):
:Arguments:
*fd*
file descriptor
:Returns:
*success*
True if shared lock successfully obtained
"""
fcntl.lockf(fd, fcntl.LOCK_SH)

return True

def _exlock(self, fd):
"""Get exclusive lock on file.
Expand All @@ -95,15 +92,9 @@ def _exlock(self, fd):
:Arguments:
*fd*
file descriptor
:Returns:
*success*
True if exclusive lock successfully obtained
"""
fcntl.lockf(fd, fcntl.LOCK_EX)

return True

def _unlock(self, fd):
"""Remove exclusive or shared lock on file.
Expand All @@ -115,15 +106,9 @@ def _unlock(self, fd):
:Arguments:
*fd*
file descriptor
:Returns:
*success*
True if lock removed
"""
fcntl.lockf(fd, fcntl.LOCK_UN)

return True

def _open_fd_r(self):
"""Open read-only file descriptor for application of advisory locks.
Expand Down Expand Up @@ -174,6 +159,12 @@ def _release_lock(self):
self._close_fd()
self.fdlock = None

def _open_file_r(self):
raise NotImplementedError

def _open_file_w(self):
raise NotImplementedError

@contextmanager
def read(self):
# if we already have any lock, proceed
Expand Down Expand Up @@ -205,39 +196,6 @@ def write(self):
self.handle.close()
self._release_lock()

def _open_r(self):
"""Open file with intention to write.
Not to be used except for debugging files.
"""
self._open_fd_r()
self._shlock(self.fd)
self.fdlock = 'shared'
self.handle = self._open_file_r()

def _open_w(self):
"""Open file with intention to write.
Not to be used except for debugging files.
"""
self._open_fd_rw()
self._exlock(self.fd)
self.fdlock = 'exclusive'
self.handle = self._open_file_w()

def _close(self):
"""Close file.
Not to be used except for debugging files.
"""
self.handle.close()
self._unlock(self.fd)
self.fdlock = None
self._close_fd()

def delete(self):
"""Delete this file and its proxy file.
Expand All @@ -249,7 +207,7 @@ def delete(self):
os.remove(self.proxy)


class FileSerial(File):
class FileSerial(BaseFile):
"""File object base class for serialization formats, such as JSON.
"""
Expand Down
66 changes: 31 additions & 35 deletions src/datreant/core/tests/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,33 @@
"""

import string
import multiprocessing as mp
import time
import pytest

from datreant.core import Treant


@pytest.fixture
def treant(tmpdir):
with tmpdir.as_cwd():
t = Treant('sprout')
return t


def pokefile(treantpath, string):
"""Add a number of tags to a Treant."""
treant = Treant(treantpath)
treant.tags.add(*["{}_{}".format(string, i) for i in range(100)])
treant.tags.add(* ["{}_{}".format(string, i) for i in range(100)])


def test_death_by_1000_pokes(treant):
pool = mp.Pool(processes=4)
for i in range(10):
pool.apply_async(pokefile, args=(treant.abspath, "run_{}".format(i)))
pool.close()
pool.join()

assert len(treant.tags) == 1000


def init_treant(tmpdir, tags):
Expand All @@ -22,38 +37,19 @@ def init_treant(tmpdir, tags):
return tf


class TestTreantFile:
def test_init_treant(tmpdir):
pool = mp.Pool(processes=4)
num = 73

@pytest.fixture
def treant(self, tmpdir):
with tmpdir.as_cwd():
t = Treant('sprout')
return t
# TODO: eventually want this to work without initing the treant
# here
init_treant(tmpdir, ['bark'])
for i in range(num):
pool.apply_async(init_treant, args=(tmpdir, ['run_{}'.format(i)]))
pool.close()
pool.join()

def test_death_by_1000_pokes(self, treant):
pool = mp.Pool(processes=4)
for i in range(10):
pool.apply_async(pokefile, args=(treant.abspath,
"run_{}".format(i)))
pool.close()
pool.join()

assert len(treant.tags) == 1000

def test_init_treant(self, tmpdir):
pool = mp.Pool(processes=4)
num = 73

# TODO: eventually want this to work without initing the treant
# here
init_treant(tmpdir, ['bark'])
for i in range(num):
pool.apply_async(init_treant, args=(tmpdir,
['run_{}'.format(i)]))
pool.close()
pool.join()

with tmpdir.as_cwd():
tf = Treant('sprout')
with tmpdir.as_cwd():
tf = Treant('sprout')

assert len(tf.tags) == num + 1
assert len(tf.tags) == num + 1

0 comments on commit 8d1dc4a

Please sign in to comment.