Skip to content

Commit

Permalink
Merge pull request #117 from theferrit32/112-fasta-thread-safety
Browse files Browse the repository at this point in the history
This PR addresses two issues:

1. Explicitly closes files. Without this, file descriptors will likely be closed only upon garbage collection, but that might not be soon enough to avoid fd exhaustion. @theferrit32 had good data to support this hypothesis.

2. Wrap files in a lock to prevent two threads from competing for file seeks. This caused issues like the following:
```
[E::bgzf_uncompress] Inflate operation failed: invalid distance too far back
[E::fai_retrieve] Failed to retrieve block. (Seeking in a compressed, .gzi unindexed, file?)
```
  • Loading branch information
reece committed Nov 13, 2023
2 parents be60406 + 701f0b3 commit 48958d7
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 8 deletions.
3 changes: 3 additions & 0 deletions src/biocommons/seqrepo/fastadir/fabgz.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class FabgzReader(object):
def __init__(self, filename):
self._fh = FastaFile(filename)

def __del__(self):
self._fh.close()

def fetch(self, seq_id, start=None, end=None):
return self._fh.fetch(seq_id.encode("ascii"), start, end)

Expand Down
50 changes: 44 additions & 6 deletions src/biocommons/seqrepo/fastadir/fastadir.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import contextlib
import threading
import datetime
import functools
import importlib.resources
Expand Down Expand Up @@ -27,6 +29,24 @@
expected_schema_version = 1


class LockableFabgzReader(contextlib.AbstractContextManager):
"""
Class that implements ContextManager and wraps a FabgzReader.
The FabgzReader is returned when acquired in a contextmanager with statement.
"""

def __init__(self, path):
self.lock = threading.Lock()
self.fabgz_reader = FabgzReader(path)

def __enter__(self):
self.lock.acquire()
return self.fabgz_reader

def __exit__(self, exc_type, exc_value, traceback):
self.lock.release()


class FastaDir(BaseReader, BaseWriter):
"""This class provides simple a simple key-value interface to a
directory of compressed fasta files.
Expand Down Expand Up @@ -80,6 +100,9 @@ def __init__(self, root_dir, writeable=False, check_same_thread=True):
)
)

def __del__(self):
self._db.close()

# ############################################################################
# Special methods

Expand All @@ -99,6 +122,7 @@ def __iter__(self):
recd = dict(rec)
recd["seq"] = self.fetch(rec["seq_id"])
yield recd
cursor.close()

def __len__(self):
return self.stats()["n_sequences"]
Expand Down Expand Up @@ -126,8 +150,10 @@ def fetch(self, seq_id, start=None, end=None):
self.commit()

path = os.path.join(self._root_dir, rec["relpath"])
fabgz = self._open_for_reading(path)
return fabgz.fetch(seq_id, start, end)

with self._open_for_reading(path) as fabgz:
seq = fabgz.fetch(seq_id, start, end)
return seq

@functools.lru_cache(maxsize=SEQREPO_LRU_CACHE_MAXSIZE)
def fetch_seqinfo(self, seq_id):
Expand Down Expand Up @@ -178,7 +204,7 @@ def store(self, seq_id, seq):
os.makedirs(dir_, exist_ok=True)
fabgz = FabgzWriter(path)
self._writing = {"relpath": relpath, "fabgz": fabgz}
_logger.debug("Opened for writing: " + path)
_logger.debug("Opened for writing: %s", path)

self._writing["fabgz"].store(seq_id, seq)
alpha = "".join(sorted(set(seq)))
Expand All @@ -188,6 +214,7 @@ def store(self, seq_id, seq):
values (?, ?, ?,?)""",
(seq_id, len(seq), alpha, self._writing["relpath"]),
)
cursor.close()
return seq_id

# ############################################################################
Expand All @@ -196,7 +223,9 @@ def store(self, seq_id, seq):
def _fetch_one(self, sql, params=()):
cursor = self._db.cursor()
cursor.execute(sql, params)
return cursor.fetchone()
val = cursor.fetchone()
cursor.close()
return val

def _upgrade_db(self):
"""upgrade db using scripts for specified (current) schema version"""
Expand All @@ -211,8 +240,16 @@ def _upgrade_db(self):

@functools.lru_cache()
def _open_for_reading(self, path):
_logger.debug("Opening for reading: " + path)
return FabgzReader(path)
"""
Opens a FabgzReader to path, wraps in a LockableFabgzReader for use in context managers.
Places it in an LRU cache so file is only opened once per FastaDir object. Caller must
lock the LockableFabgzReader or otherwise handle concurrent access if sharing between
in-process concurrent execution threads, such as asyncio (e.g. WSGI/ASGI web servers)
"""
_logger.debug("Opening for reading: %s", path)
if not os.path.exists(path):
_logger.error("_open_for_reading path does not exist: %s", path)
return LockableFabgzReader(path)

def _dump_aliases(self):
import prettytable
Expand All @@ -224,3 +261,4 @@ def _dump_aliases(self):
for r in cursor:
pt.add_row([r[f] for f in fields])
print(pt)
cursor.close()
3 changes: 3 additions & 0 deletions src/biocommons/seqrepo/seqaliasdb/seqaliasdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def __init__(
# ############################################################################
# Special methods

def __del__(self):
self._db.close()

def __contains__(self, seq_id):
cursor = self._db.cursor()
cursor.execute(
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def dataproxy():

@pytest.fixture(scope="session")
def rest_dataproxy():
return SeqRepoRESTDataProxy(base_url="http://localhost:5000/seqrepo")
url = os.environ.get("SEQREPO_REST_URL", "http://localhost:5000/seqrepo")
return SeqRepoRESTDataProxy(base_url=url)


@pytest.fixture(scope="session")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_fabgz.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_write_reread():

# now read them back
far = FabgzReader(fabgz_fn)
assert far.filename.startswith("/tmp/".encode())
assert far.filename.startswith(tmpdir.encode())
assert set(far.keys()) == set(sequences.keys())
assert 5 == len(far), "expected 5 sequences"
assert "l10" in far.keys()
Expand Down

0 comments on commit 48958d7

Please sign in to comment.