From d3ebde98a0626b854325afba38cdb72966d04ed8 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Mon, 7 Mar 2022 13:07:44 -0500 Subject: [PATCH] Improve girder-mount command. Add diskcache optionally to mount. This allows some caching at the fuse mount level which can reduce traffic to non-filesystem assetstores (such as S3). This substantially helps speeds when using inefficient file readers on such assetstores (e.g., openslide reading Hamamatsu ndpi files). --- girder/cli/mount.py | 176 ++++++++++++++++++++++++++++++++------ setup.py | 2 + tests/cases/mount_test.py | 36 ++++---- 3 files changed, 170 insertions(+), 44 deletions(-) diff --git a/girder/cli/mount.py b/girder/cli/mount.py index c7e3b64cfb..297fbcaaff 100644 --- a/girder/cli/mount.py +++ b/girder/cli/mount.py @@ -1,24 +1,29 @@ -import cherrypy -import click import errno -import fuse +import functools +import hashlib +import logging import os import stat import sys import threading import time +import cachetools +import cherrypy +import click +import fuse + import girder from girder import events, logger, logprint -from girder.exceptions import AccessException, ValidationException +from girder.exceptions import AccessException, FilePathException, ValidationException from girder.models.file import File from girder.models.folder import Folder from girder.models.item import Item from girder.models.setting import Setting from girder.settings import SettingKey from girder.utility import config -from girder.utility.model_importer import ModelImporter from girder.utility import path as path_util +from girder.utility.model_importer import ModelImporter from girder.utility.server import configureServer @@ -32,7 +37,7 @@ class ServerFuse(fuse.Operations): use_ns = True - def __init__(self, stat=None): + def __init__(self, stat=None, options=None): """ Instantiate the operations class. This sets up tracking for open files and file descriptor numbers (handles). @@ -42,7 +47,10 @@ def __init__(self, stat=None): the same uid, gid, and atime. If the resource lacks both an updated and a created time stamp, the ctime and mtime will also be taken from this. If None, this defaults to the user of the Girder - process's home directory, + process's home directory. + :param options: a dictionary of additional options. May be modified if + some options are directly handled rather than are to be passed to + the mount command. """ super().__init__() if not stat: @@ -54,6 +62,52 @@ def __init__(self, stat=None): self.nextFH = 1 self.openFiles = {} self.openFilesLock = threading.Lock() + options = options or {} + self.cache = cachetools.TTLCache(maxsize=10000, ttl=int(options.pop('stat_cache_ttl', 1))) + self.diskcache = None + self._configure_disk_cache(options) + + def _configure_disk_cache(self, cacheopts): + """ + Configure the disk cache. + + :param cacheopts: An optional dictionary of options. Any option that + starts with 'diskcache' will be passed to diskcache.Cache without + the 'diskcache' prefix. 'diskcache' by itself is a boolean to + enable or disable the diskcache. + """ + use = None + try: + import diskcache + except ImportError: + use = False + + options = { + 'directory': '~/.cache/girder-mount', + # It would be nicer to use 'least-recently-used', but it is + # comparatively expensive, as every access requires a database + # write. least-recently-stored does not. + 'eviction_policy': 'least-recently-stored', + # This seems to be necessary to allow concurrent access from + # multiple mounts + 'sqlite_journal_mode': 'truncate', + + } + for key in list((cacheopts or {}).keys()): + if key.startswith('diskcache'): + value = cacheopts.pop(key) + key = key[len('diskcache'):].lstrip('_') + use = True if use is None else use + if key: + options[key] = value + elif not value: + use = False + if use: + chunk = int(options.pop('chunk', 128 * 1024)) + self.diskcache = { + 'chunk': chunk, + 'cache': diskcache.Cache(**options) + } def __call__(self, op, path, *args, **kwargs): """ @@ -64,11 +118,11 @@ def __call__(self, op, path, *args, **kwargs): etc.). """ logger.debug('-> %s %s %s', op, path, repr(args)) - ret = '[exception]' try: ret = getattr(self, op)(path, *args, **kwargs) return ret except Exception as e: + ret = '[exception]' # Log all exceptions and then reraise them if getattr(e, 'errno', None) in (errno.ENOENT, errno.EACCES): logger.debug('-- %s %r', op, e) @@ -81,7 +135,9 @@ def __call__(self, op, path, *args, **kwargs): else: logger.debug('<- %s (length %d) %r', op, len(ret), ret[:16]) - def _getPath(self, path): + @cachetools.cachedmethod(lambda self: self.cache, key=functools.partial( + cachetools.keys.hashkey, '_get_path')) + def _get_path(self, path): """ Given a fuse path, return the associated resource. @@ -121,7 +177,8 @@ def _stat(self, doc, model): # str(doc['_id'])).hexdigest()[-8:], 16) ). There doesn't seem to be # any measurable benefit of this, however, so we specify use_ino false # in the mount and set the value to -1 here. - attr['st_ino'] = -1 + # attr['st_ino'] = -1 + attr['st_ino'] = int(hashlib.sha512(str(doc['_id']).encode()).hexdigest()[-8:], 16) attr['st_nlink'] = 1 if 'updated' in doc: attr['st_mtime'] = int(time.mktime(doc['updated'].timetuple()) * 1e9) @@ -216,6 +273,8 @@ def flush(self, path, fh=None): """ return 0 + @cachetools.cachedmethod(lambda self: self.cache, key=functools.partial( + cachetools.keys.hashkey, 'getattr')) def getattr(self, path, fh=None): """ Get the attributes dictionary of a path. @@ -230,7 +289,7 @@ def getattr(self, path, fh=None): attr['st_mode'] = 0o500 | stat.S_IFDIR attr['st_size'] = 0 else: - resource = self._getPath(path) + resource = self._get_path(path) attr = self._stat(resource['document'], resource['model']) if attr.get('st_blksize') and attr.get('st_size'): attr['st_blocks'] = int( @@ -253,7 +312,42 @@ def read(self, path, size, offset, fh): if fh not in self.openFiles: raise fuse.FuseOSError(errno.EBADF) info = self.openFiles[fh] + if self.diskcache and info.get('allowcache'): + result = b'' + for idx in range( + offset // self.diskcache['chunk'], + (offset + size + self.diskcache['chunk'] - 1) // self.diskcache['chunk']): + idxoffset = idx * self.diskcache['chunk'] + idxlen = min(self.diskcache['chunk'], info['size'] - idxoffset) + key = '%s-%d-%d' % (info['hash'], idxoffset, idxlen) + try: + data = self.diskcache['cache'].get(key, None, read=True) + except Exception: + logger.exception('diskcache threw an exception in get') + data = None + if data is None: + with info['lock']: + if 'handle' not in info: + info['handle'] = File().open(info['document']) + handle = info['handle'] + handle.seek(idxoffset) + data = handle.read(idxlen) + try: + self.diskcache['cache'][key] = data + except Exception: + logger.exception('diskcache threw an exception in set') + result += data[max(0, offset - idxoffset): + min(len(data), offset + size - idxoffset)] + else: + data.seek(max(0, offset - idxoffset)) + result += data.read(size - len(result)) + return result with info['lock']: + if 'handle' not in info: + info['handle'] = ( + File().open(info['document']) + if not info.get('directpath') else + open(info['directpath'], 'rb')) handle = info['handle'] handle.seek(offset) return handle.read(size) @@ -277,7 +371,7 @@ def readdir(self, path, fh): for doc in docList: result.append(self._name(doc, model)) else: - resource = self._getPath(path) + resource = self._get_path(path) result.extend(self._list(resource['document'], resource['model'])) return result @@ -290,7 +384,7 @@ def open(self, path, flags): read only. :returns: a file descriptor. """ - resource = self._getPath(path) + resource = self._get_path(path) if resource['model'] != 'file': return super().open(path, flags) if flags & (os.O_APPEND | os.O_ASYNC | os.O_CREAT | os.O_DIRECTORY @@ -298,9 +392,25 @@ def open(self, path, flags): raise fuse.FuseOSError(errno.EROFS) info = { 'path': path, - 'handle': File().open(resource['document']), + 'document': resource['document'], + 'hash': ( + resource['document']['sha512'] + if 'sha512' in resource['document'] else + '%s-%d-%s' % ( + resource['document']['_id'], resource['document']['size'], + resource['document'].get('updated', resource['document']['created']))), + 'size': resource['document']['size'], + 'allowcache': True, 'lock': threading.Lock(), } + try: + directpath = File().getAssetstoreAdapter( + resource['document']).getLocalFilePath(resource['document']) + if directpath: + info['allowcache'] = False + info['directpath'] = directpath + except FilePathException: + pass with self.openFilesLock: fh = self.nextFH self.nextFH += 1 @@ -397,11 +507,16 @@ def unmountServer(path, lazy=False, quiet=False): 'the default database will be used.') @click.option( '-o', '--options', 'fuseOptions', default=None, - help='Comma separated list of additional FUSE mount options. ' - 'ro and use_ino cannot be overridden.') -@click.option( - '-q', '--quiet', is_flag=True, default=False, - help='Suppress Girder startup information or unmount output.') + help='Comma separated list of additional FUSE mount options. ro cannot ' + 'be overridden. Some additional options can be specified: Options ' + 'beginning with diskcache are used to create a diskcache for somewhat ' + 'persistent local data storage. These are passed to diskcache.Cache ' + 'with the "diskcache" prefix removed. diskcache by itself will enable ' + 'the default diskcache. diskcache_directory and diskcache_size_limit (in ' + 'bytes) are the most common. The directory defaults to ' + '~/.cache/girder-mount. stat_cache_ttl specifies how long in seconds ' + 'attributes are cached for girder documents. A longer time reduces ' + 'network access but could result in stale permissions or miss updates.') @click.option( '-u', '--umount', '--unmount', 'unmount', is_flag=True, default=False, help='Unmount a mounted FUSE filesystem.') @@ -409,15 +524,20 @@ def unmountServer(path, lazy=False, quiet=False): '-l', '-z', '--lazy', 'lazy', is_flag=True, default=False, help='Lazy unmount.') @click.option('--plugins', default=None, help='Comma separated list of plugins to import.') -def main(path, database, fuseOptions, quiet, unmount, lazy, plugins): +@click.option( + '-q', '--quiet', is_flag=True, default=False, + help='Suppress Girder startup information or unmount output.') +@click.option('-v', '--verbose', count=True) +def main(path, database, fuseOptions, unmount, lazy, plugins, quiet, verbose): if unmount or lazy: result = unmountServer(path, lazy, quiet) sys.exit(result) mountServer(path=path, database=database, fuseOptions=fuseOptions, - quiet=quiet, plugins=plugins) + quiet=quiet, verbose=verbose, plugins=plugins) -def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None): +def mountServer(path, database=None, fuseOptions=None, quiet=False, verbose=0, + plugins=None): """ Perform the mount. @@ -429,14 +549,18 @@ def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None case insensitive. For instance, 'foreground' or 'foreground=True' will keep this program running until the SIGTERM or unmounted. :param quiet: if True, suppress Girder logs. + :param verbose: if not zero, log to stderr instead of the girder logs. :param plugins: an optional list of plugins to enable. If None, use the plugins that are configured. """ - if quiet: + if quiet or verbose: curConfig = config.getConfig() curConfig.setdefault('logging', {})['log_quiet'] = True curConfig.setdefault('logging', {})['log_level'] = 'FATAL' girder._attachFileLogHandlers() + if verbose: + logger.setLevel(max(1, logging.ERROR - verbose * 10)) + logger.addHandler(logging.StreamHandler()) if database and '://' in database: cherrypy.config['database']['uri'] = database if plugins is not None: @@ -444,7 +568,6 @@ def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None webroot, appconf = configureServer(plugins=plugins) girder._setupCache() - opClass = ServerFuse(stat=os.stat(path)) options = { # By default, we run in the background so the mount command returns # immediately. If we run in the foreground, a SIGTERM will shut it @@ -454,7 +577,7 @@ def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None # This lets the OS buffer files efficiently. 'auto_cache': True, # We aren't specifying our own inos - 'use_ino': False, + # 'use_ino': False, # read-only file system 'ro': True, } @@ -469,10 +592,11 @@ def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None True if value.lower() == 'true' else value) else: key, value = opt, True - if key in ('use_ino', 'ro', 'rw') and options.get(key) != value: + if key in ('ro', 'rw') and options.get(key) != value: logprint.warning('Ignoring the %s=%r option' % (key, value)) continue options[key] = value + opClass = ServerFuse(stat=os.stat(path), options=options) Setting().set(SettingKey.GIRDER_MOUNT_INFORMATION, {'path': path, 'mounttime': time.time()}) FUSELogError(opClass, path, **options) diff --git a/setup.py b/setup.py index 7e620c9628..4ea20223a5 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,8 @@ def prerelease_local_scheme(version): 'paramiko' ], 'mount': [ + 'cachetools', + 'diskcache', 'fusepy>=3.0' ] } diff --git a/tests/cases/mount_test.py b/tests/cases/mount_test.py index bd6c0fff8e..42a165d54c 100644 --- a/tests/cases/mount_test.py +++ b/tests/cases/mount_test.py @@ -278,22 +278,22 @@ def testFunctionCall(self): def testFunctionGetPath(self): op = mount.ServerFuse() - resource = op._getPath(self.publicFileName) + resource = op._get_path(self.publicFileName) self.assertEqual(resource['model'], 'file') - resource = op._getPath(os.path.dirname(self.publicFileName)) + resource = op._get_path(os.path.dirname(self.publicFileName)) self.assertEqual(resource['model'], 'item') - resource = op._getPath(os.path.dirname(os.path.dirname(self.publicFileName))) + resource = op._get_path(os.path.dirname(os.path.dirname(self.publicFileName))) self.assertEqual(resource['model'], 'folder') - resource = op._getPath(self.privateFileName) + resource = op._get_path(self.privateFileName) self.assertEqual(resource['model'], 'file') with self.assertRaises(fuse.FuseOSError): - op._getPath('nosuchpath') + op._get_path('nosuchpath') def testFunctionStat(self): op = mount.ServerFuse() - resource = op._getPath(self.publicFileName) + resource = op._get_path(self.publicFileName) attr = op._stat(resource['document'], resource['model']) - self.assertEqual(attr['st_ino'], -1) + self.assertIn('st_ino', attr) self.assertEqual(attr['st_nlink'], 1) self.assertGreater(attr['st_mtime'], time.time() - 1e5) self.assertEqual(attr['st_ctime'], attr['st_mtime']) @@ -302,43 +302,43 @@ def testFunctionStat(self): resource['document']['updated'] = datetime.datetime.utcfromtimestamp(time.time() + 1) File().save(resource['document']) oldmtime = attr['st_mtime'] - resource = op._getPath(self.publicFileName) + resource = op._get_path(self.publicFileName) attr = op._stat(resource['document'], resource['model']) self.assertGreater(attr['st_mtime'], oldmtime) - resource = op._getPath(os.path.dirname(self.publicFileName)) + resource = op._get_path(os.path.dirname(self.publicFileName)) attr = op._stat(resource['document'], resource['model']) self.assertEqual(attr['st_mode'], 0o500 | stat.S_IFDIR) self.assertEqual(attr['st_size'], 0) - resource = op._getPath(os.path.dirname(os.path.dirname(self.publicFileName))) + resource = op._get_path(os.path.dirname(os.path.dirname(self.publicFileName))) attr = op._stat(resource['document'], resource['model']) self.assertEqual(attr['st_mode'], 0o500 | stat.S_IFDIR) self.assertEqual(attr['st_size'], 0) def testFunctionName(self): op = mount.ServerFuse() - resource = op._getPath(self.publicFileName) + resource = op._get_path(self.publicFileName) name = op._name(resource['document'], resource['model']) self.assertEqual(name, os.path.basename(self.publicFileName)) - resource = op._getPath(os.path.dirname(self.publicFileName)) + resource = op._get_path(os.path.dirname(self.publicFileName)) name = op._name(resource['document'], resource['model']) self.assertEqual(name, os.path.basename(os.path.dirname(self.publicFileName))) def testFunctionList(self): op = mount.ServerFuse() - resource = op._getPath(os.path.dirname(self.publicFileName)) + resource = op._get_path(os.path.dirname(self.publicFileName)) filelist = op._list(resource['document'], resource['model']) self.assertIn(os.path.basename(self.publicFileName), filelist) - resource2 = op._getPath(os.path.dirname(os.path.dirname(self.publicFileName))) + resource2 = op._get_path(os.path.dirname(os.path.dirname(self.publicFileName))) filelist = op._list(resource2['document'], resource2['model']) self.assertIn(os.path.basename(os.path.dirname(self.publicFileName)), filelist) - resource3 = op._getPath(os.path.dirname(self.adminFileName)) + resource3 = op._get_path(os.path.dirname(self.adminFileName)) filelist = op._list(resource3['document'], resource3['model']) self.assertIn(os.path.basename(self.adminFileName), filelist) - resource4 = op._getPath(os.path.dirname(os.path.dirname(self.adminFileName))) + resource4 = op._get_path(os.path.dirname(os.path.dirname(self.adminFileName))) filelist = op._list(resource4['document'], resource4['model']) self.assertIn(os.path.basename(os.path.dirname(self.adminFileName)), filelist) - resource5 = op._getPath(os.path.dirname(os.path.dirname( + resource5 = op._get_path(os.path.dirname(os.path.dirname( os.path.dirname(self.adminFileName)))) filelist = op._list(resource5['document'], resource5['model']) self.assertIn(os.path.basename(os.path.dirname( @@ -358,7 +358,7 @@ def testFunctionGetattr(self): self.assertEqual(attr['st_mode'], 0o500 | stat.S_IFDIR) self.assertEqual(attr['st_size'], 0) attr = op.getattr(self.publicFileName) - self.assertEqual(attr['st_ino'], -1) + self.assertIn('st_ino', attr) self.assertEqual(attr['st_nlink'], 1) self.assertGreater(attr['st_mtime'], time.time() - 1e5) self.assertEqual(attr['st_ctime'], attr['st_mtime'])