diff --git a/misc/threading-tests/README.md b/misc/threading-tests/README.md new file mode 100644 index 0000000..32b0c26 --- /dev/null +++ b/misc/threading-tests/README.md @@ -0,0 +1,91 @@ +# Threading Tests + +This directory contains seqrepo tests for file descriptor exhaustion, especially in threading context +The idea: make it easy to test threading and cache size combinations. + + +See https://github.com/biocommons/biocommons.seqrepo/issues/112 + + + +## Examples + +### single thread, without fd caching + +``` +snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 +2023-09-13 15:25:56 snafu biocommons.seqrepo.fastadir.fastadir[2274974] INFO File descriptor caching disabled +2023-09-13 15:25:57 snafu root[2274974] INFO Queued 1000 accessions +2023-09-13 15:25:57 snafu root[2274974] INFO Starting run with 1 threads +2023-09-13 15:26:01 snafu root[2274974] INFO : Done; processed 1000 accessions +2023-09-13 15:26:01 snafu root[2274974] INFO Fetched 1000 sequences in 4.281685499 s with 1 threads; 234 seq/sec +``` + +### single thread, with fd caching + +``` +snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100 +2023-09-13 15:26:07 snafu biocommons.seqrepo.fastadir.fastadir[2275006] WARNING File descriptor caching enabled (size=100) +2023-09-13 15:26:08 snafu root[2275006] INFO Queued 1000 accessions +2023-09-13 15:26:08 snafu root[2275006] INFO Starting run with 1 threads +2023-09-13 15:26:08 snafu root[2275006] INFO : Done; processed 1000 accessions +2023-09-13 15:26:08 snafu root[2275006] INFO Fetched 1000 sequences in 0.41264548700000003 s with 1 threads; 2423 seq/sec +CacheInfo(hits=934, misses=66, maxsize=100, currsize=66) +``` + +### five threads, without caching + +``` +snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5 +2023-09-13 15:26:16 snafu biocommons.seqrepo.fastadir.fastadir[2275039] INFO File descriptor caching disabled +2023-09-13 15:26:17 snafu root[2275039] INFO Queued 1000 accessions +2023-09-13 15:26:17 snafu root[2275039] INFO Starting run with 5 threads +2023-09-13 15:26:19 snafu root[2275039] INFO : Done; processed 197 accessions +2023-09-13 15:26:19 snafu root[2275039] INFO : Done; processed 200 accessions +2023-09-13 15:26:19 snafu root[2275039] INFO : Done; processed 210 accessions +2023-09-13 15:26:19 snafu root[2275039] INFO : Done; processed 198 accessions +2023-09-13 15:26:19 snafu root[2275039] INFO : Done; processed 195 accessions +2023-09-13 15:26:19 snafu root[2275039] INFO Fetched 1000 sequences in 5.946146807 s with 5 threads; 168 seq/sec +``` + +### five threads, with caching :-( + +``` +snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5 -f 10 +2023-09-13 15:26:32 snafu biocommons.seqrepo.fastadir.fastadir[2275104] WARNING File descriptor caching enabled (size=10) +2023-09-13 15:26:33 snafu root[2275104] INFO Queued 1000 accessions +2023-09-13 15:26:33 snafu root[2275104] INFO Starting run with 5 threads +[E::bgzf_uncompress] Inflate operation failed: invalid distance too far back +[E::fai_retrieve] Failed to retrieve block. (Seeking in a compressed, .gzi unindexed, file?) +Exception in thread Thread-5: +Traceback (most recent call last): + File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner + self.run() +``` + + +### 1 thread, cache_size < # available fds + +Same as above successful run, but Limit the process to 50 open file descriptors causes failure + +``` +snafu$ (ulimit -n 50; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100) +2023-09-13 15:31:21 snafu biocommons.seqrepo.fastadir.fastadir[2275776] WARNING File descriptor caching enabled (size=100) +2023-09-13 15:31:21 snafu root[2275776] INFO Queued 1000 accessions +2023-09-13 15:31:21 snafu root[2275776] INFO Starting run with 1 threads +[E::fai_load3_core] Failed to open FASTA index /usr/local/share/seqrepo/2021-01-29/sequences/2020/0412/1420/1586701238.5306098.fa.bgz.gzi: Too many open files +Exception in thread Thread-1: +Traceback (most recent call last): +⋮ +``` + + +## Other useful commands + +``` +# dynamic (/2s) list of open files in seqrepo instance directory +watch lsof +D '/usr/local/share/seqrepo/' + +# arbitrarily +(ulimit -n 200; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a archive/accessions.gz -f 128) +``` diff --git a/misc/threading-tests/threading-test b/misc/threading-tests/threading-test new file mode 100755 index 0000000..09fe750 --- /dev/null +++ b/misc/threading-tests/threading-test @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +"""test seqrepo for file descriptor exhaustion, especially in threading context + +https://github.com/biocommons/biocommons.seqrepo/issues/112 + +The idea: read a bunch of NMs on stdin. Fetch the sequence for each in a threading context. + +""" + +import argparse +import logging +import queue +import pathlib +import random +import threading +import time + +from smart_open import open + +from biocommons.seqrepo import SeqRepo + +_logger = logging.getLogger() + + +class Worker(threading.Thread): + def __init__(self, q: queue.Queue, sr: SeqRepo): + self.q = q + self.sr = sr + self.n = 0 + super().__init__() + + def run(self): + try: + while True: + ac = self.q.get(False) + sr.fetch(ac, 0, 5) + self.q.task_done() + self.n += 1 + except queue.Empty: + _logger.info(f"{self}: Done; processed {self.n} accessions") + return + + +def parse_args(): + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("-n", "--n-threads", type=int, default=1) + ap.add_argument("-s", "--seqrepo-path", type=pathlib.Path, required=True) + ap.add_argument("-m", "--max-accessions", type=int) + ap.add_argument("-f", "--fd-cache-size", type=int, default=0) + opts = ap.parse_args() + return opts + +if __name__ == "__main__": + import coloredlogs + import sys + + coloredlogs.install(level="INFO") + + opts = parse_args() + + sr = SeqRepo(opts.seqrepo_path, fd_cache_size=opts.fd_cache_size) + + acs = set(a["alias"] for a in sr.aliases.find_aliases(namespace="RefSeq", alias="NM_%")) + acs = random.sample(sorted(acs), opts.max_accessions or len(acs)) + q = queue.Queue() + for ac in acs: + q.put(ac) + qs = q.qsize() + _logger.info(f"Queued {qs} accessions") + + _logger.info(f"Starting run with {opts.n_threads} threads") + t0 = time.process_time() + for _ in range(opts.n_threads): + Worker(q=q, sr=sr).start() + q.join() + t1 = time.process_time() + td = t1 - t0 + rate = float(qs) / td + _logger.info(f"Fetched {qs} sequences in {td} s with {opts.n_threads} threads; {rate:.0f} seq/sec") + + if hasattr(sr.sequences._open_for_reading, "cache_info"): + print(sr.sequences._open_for_reading.cache_info()) + \ No newline at end of file diff --git a/misc/threading-verification.py b/misc/threading-tests/threading-verification.py similarity index 100% rename from misc/threading-verification.py rename to misc/threading-tests/threading-verification.py diff --git a/src/biocommons/seqrepo/fastadir/fabgz.py b/src/biocommons/seqrepo/fastadir/fabgz.py index 02c6a2d..7376e64 100644 --- a/src/biocommons/seqrepo/fastadir/fabgz.py +++ b/src/biocommons/seqrepo/fastadir/fabgz.py @@ -15,6 +15,7 @@ import shutil import stat import subprocess +import threading import six from pysam import FastaFile @@ -70,12 +71,24 @@ def _find_bgzip(): class FabgzReader(object): + """ + Class that implements ContextManager and wraps a FabgzReader. + The FabgzReader is returned when acquired in a contextmanager with statement. + """ def __init__(self, filename): + self.lock = threading.Lock() self._fh = FastaFile(filename) def __del__(self): self._fh.close() + def __enter__(self): + self.lock.acquire() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.lock.release() + def fetch(self, seq_id, start=None, end=None): return self._fh.fetch(seq_id.encode("ascii"), start, end) diff --git a/src/biocommons/seqrepo/fastadir/fastadir.py b/src/biocommons/seqrepo/fastadir/fastadir.py index 50de156..e1247ae 100644 --- a/src/biocommons/seqrepo/fastadir/fastadir.py +++ b/src/biocommons/seqrepo/fastadir/fastadir.py @@ -29,24 +29,6 @@ expected_schema_version = 1 -class LockableFabgzReader(contextlib.AbstractContextManager): - """ - Class that implements ContextManager and wraps a FabgzReader. - The FabgzReader is returned when acquired in a contextmanager with statement. - """ - - def __init__(self, path): - self.lock = threading.Lock() - self.fabgz_reader = FabgzReader(path) - - def __enter__(self): - self.lock.acquire() - return self.fabgz_reader - - def __exit__(self, exc_type, exc_value, traceback): - self.lock.release() - - class FastaDir(BaseReader, BaseWriter): """This class provides simple a simple key-value interface to a directory of compressed fasta files. @@ -70,7 +52,7 @@ class FastaDir(BaseReader, BaseWriter): """ - def __init__(self, root_dir, writeable=False, check_same_thread=True): + def __init__(self, root_dir, writeable=False, check_same_thread=True, fd_cache_size=0): """Creates a new sequence repository if necessary, and then opens it""" self._root_dir = root_dir @@ -99,6 +81,18 @@ def __init__(self, root_dir, writeable=False, check_same_thread=True): schema_version, expected_schema_version ) ) + + if fd_cache_size == 0: + _logger.info(f"File descriptor caching disabled") + def _open_for_reading(path): + _logger.debug("Opening for reading uncached: " + path) + return FabgzReader(path) + else: + _logger.warning(f"File descriptor caching enabled (size={fd_cache_size})") + @functools.lru_cache(maxsize=fd_cache_size) + def _open_for_reading(path): + return FabgzReader(path) + self._open_for_reading = _open_for_reading def __del__(self): self._db.close() @@ -238,19 +232,6 @@ def _upgrade_db(self): migrations_to_apply = backend.to_apply(migrations) backend.apply_migrations(migrations_to_apply) - @functools.lru_cache() - def _open_for_reading(self, path): - """ - Opens a FabgzReader to path, wraps in a LockableFabgzReader for use in context managers. - Places it in an LRU cache so file is only opened once per FastaDir object. Caller must - lock the LockableFabgzReader or otherwise handle concurrent access if sharing between - in-process concurrent execution threads, such as asyncio (e.g. WSGI/ASGI web servers) - """ - _logger.debug("Opening for reading: %s", path) - if not os.path.exists(path): - _logger.error("_open_for_reading path does not exist: %s", path) - return LockableFabgzReader(path) - def _dump_aliases(self): import prettytable diff --git a/src/biocommons/seqrepo/seqrepo.py b/src/biocommons/seqrepo/seqrepo.py index a80ed9f..5f05394 100644 --- a/src/biocommons/seqrepo/seqrepo.py +++ b/src/biocommons/seqrepo/seqrepo.py @@ -100,6 +100,7 @@ def __init__( translate_ncbi_namespace=None, check_same_thread=False, use_sequenceproxy=True, + fd_cache_size=0 ): self._root_dir = root_dir self._upcase = upcase @@ -122,6 +123,7 @@ def __init__( self._seq_path, writeable=self._writeable, check_same_thread=self._check_same_thread, + fd_cache_size=fd_cache_size ) self.aliases = SeqAliasDB( self._db_path,