Skip to content

Commit

Permalink
Merge pull request #3487 from girder/mount-diskcache
Browse files Browse the repository at this point in the history
Add diskcache optionally to girder-mount command
  • Loading branch information
manthey committed Dec 12, 2023
2 parents 5d197c5 + d3ebde9 commit 39b17ab
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 44 deletions.
176 changes: 150 additions & 26 deletions girder/cli/mount.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import cherrypy
import click
import errno
import fuse
import functools
import hashlib
import logging
import os
import stat
import sys
import threading
import time

import cachetools
import cherrypy
import click
import fuse

import girder
from girder import events, logger, logprint
from girder.exceptions import AccessException, ValidationException
from girder.exceptions import AccessException, FilePathException, ValidationException
from girder.models.file import File
from girder.models.folder import Folder
from girder.models.item import Item
from girder.models.setting import Setting
from girder.settings import SettingKey
from girder.utility import config
from girder.utility.model_importer import ModelImporter
from girder.utility import path as path_util
from girder.utility.model_importer import ModelImporter
from girder.utility.server import configureServer


Expand All @@ -32,7 +37,7 @@ class ServerFuse(fuse.Operations):

use_ns = True

def __init__(self, stat=None):
def __init__(self, stat=None, options=None):
"""
Instantiate the operations class. This sets up tracking for open
files and file descriptor numbers (handles).
Expand All @@ -42,7 +47,10 @@ def __init__(self, stat=None):
the same uid, gid, and atime. If the resource lacks both an
updated and a created time stamp, the ctime and mtime will also be
taken from this. If None, this defaults to the user of the Girder
process's home directory,
process's home directory.
:param options: a dictionary of additional options. May be modified if
some options are directly handled rather than are to be passed to
the mount command.
"""
super().__init__()
if not stat:
Expand All @@ -54,6 +62,52 @@ def __init__(self, stat=None):
self.nextFH = 1
self.openFiles = {}
self.openFilesLock = threading.Lock()
options = options or {}
self.cache = cachetools.TTLCache(maxsize=10000, ttl=int(options.pop('stat_cache_ttl', 1)))
self.diskcache = None
self._configure_disk_cache(options)

def _configure_disk_cache(self, cacheopts):
"""
Configure the disk cache.
:param cacheopts: An optional dictionary of options. Any option that
starts with 'diskcache' will be passed to diskcache.Cache without
the 'diskcache' prefix. 'diskcache' by itself is a boolean to
enable or disable the diskcache.
"""
use = None
try:
import diskcache
except ImportError:
use = False

options = {
'directory': '~/.cache/girder-mount',
# It would be nicer to use 'least-recently-used', but it is
# comparatively expensive, as every access requires a database
# write. least-recently-stored does not.
'eviction_policy': 'least-recently-stored',
# This seems to be necessary to allow concurrent access from
# multiple mounts
'sqlite_journal_mode': 'truncate',

}
for key in list((cacheopts or {}).keys()):
if key.startswith('diskcache'):
value = cacheopts.pop(key)
key = key[len('diskcache'):].lstrip('_')
use = True if use is None else use
if key:
options[key] = value
elif not value:
use = False
if use:
chunk = int(options.pop('chunk', 128 * 1024))
self.diskcache = {
'chunk': chunk,
'cache': diskcache.Cache(**options)
}

def __call__(self, op, path, *args, **kwargs):
"""
Expand All @@ -64,11 +118,11 @@ def __call__(self, op, path, *args, **kwargs):
etc.).
"""
logger.debug('-> %s %s %s', op, path, repr(args))
ret = '[exception]'
try:
ret = getattr(self, op)(path, *args, **kwargs)
return ret
except Exception as e:
ret = '[exception]'
# Log all exceptions and then reraise them
if getattr(e, 'errno', None) in (errno.ENOENT, errno.EACCES):
logger.debug('-- %s %r', op, e)
Expand All @@ -81,7 +135,9 @@ def __call__(self, op, path, *args, **kwargs):
else:
logger.debug('<- %s (length %d) %r', op, len(ret), ret[:16])

def _getPath(self, path):
@cachetools.cachedmethod(lambda self: self.cache, key=functools.partial(
cachetools.keys.hashkey, '_get_path'))
def _get_path(self, path):
"""
Given a fuse path, return the associated resource.
Expand Down Expand Up @@ -121,7 +177,8 @@ def _stat(self, doc, model):
# str(doc['_id'])).hexdigest()[-8:], 16) ). There doesn't seem to be
# any measurable benefit of this, however, so we specify use_ino false
# in the mount and set the value to -1 here.
attr['st_ino'] = -1
# attr['st_ino'] = -1
attr['st_ino'] = int(hashlib.sha512(str(doc['_id']).encode()).hexdigest()[-8:], 16)
attr['st_nlink'] = 1
if 'updated' in doc:
attr['st_mtime'] = int(time.mktime(doc['updated'].timetuple()) * 1e9)
Expand Down Expand Up @@ -216,6 +273,8 @@ def flush(self, path, fh=None):
"""
return 0

@cachetools.cachedmethod(lambda self: self.cache, key=functools.partial(
cachetools.keys.hashkey, 'getattr'))
def getattr(self, path, fh=None):
"""
Get the attributes dictionary of a path.
Expand All @@ -230,7 +289,7 @@ def getattr(self, path, fh=None):
attr['st_mode'] = 0o500 | stat.S_IFDIR
attr['st_size'] = 0
else:
resource = self._getPath(path)
resource = self._get_path(path)
attr = self._stat(resource['document'], resource['model'])
if attr.get('st_blksize') and attr.get('st_size'):
attr['st_blocks'] = int(
Expand All @@ -253,7 +312,42 @@ def read(self, path, size, offset, fh):
if fh not in self.openFiles:
raise fuse.FuseOSError(errno.EBADF)
info = self.openFiles[fh]
if self.diskcache and info.get('allowcache'):
result = b''
for idx in range(
offset // self.diskcache['chunk'],
(offset + size + self.diskcache['chunk'] - 1) // self.diskcache['chunk']):
idxoffset = idx * self.diskcache['chunk']
idxlen = min(self.diskcache['chunk'], info['size'] - idxoffset)
key = '%s-%d-%d' % (info['hash'], idxoffset, idxlen)
try:
data = self.diskcache['cache'].get(key, None, read=True)
except Exception:
logger.exception('diskcache threw an exception in get')
data = None
if data is None:
with info['lock']:
if 'handle' not in info:
info['handle'] = File().open(info['document'])
handle = info['handle']
handle.seek(idxoffset)
data = handle.read(idxlen)
try:
self.diskcache['cache'][key] = data
except Exception:
logger.exception('diskcache threw an exception in set')
result += data[max(0, offset - idxoffset):
min(len(data), offset + size - idxoffset)]
else:
data.seek(max(0, offset - idxoffset))
result += data.read(size - len(result))
return result
with info['lock']:
if 'handle' not in info:
info['handle'] = (
File().open(info['document'])
if not info.get('directpath') else
open(info['directpath'], 'rb'))
handle = info['handle']
handle.seek(offset)
return handle.read(size)
Expand All @@ -277,7 +371,7 @@ def readdir(self, path, fh):
for doc in docList:
result.append(self._name(doc, model))
else:
resource = self._getPath(path)
resource = self._get_path(path)
result.extend(self._list(resource['document'], resource['model']))
return result

Expand All @@ -290,17 +384,33 @@ def open(self, path, flags):
read only.
:returns: a file descriptor.
"""
resource = self._getPath(path)
resource = self._get_path(path)
if resource['model'] != 'file':
return super().open(path, flags)
if flags & (os.O_APPEND | os.O_ASYNC | os.O_CREAT | os.O_DIRECTORY
| os.O_EXCL | os.O_RDWR | os.O_TRUNC | os.O_WRONLY):
raise fuse.FuseOSError(errno.EROFS)
info = {
'path': path,
'handle': File().open(resource['document']),
'document': resource['document'],
'hash': (
resource['document']['sha512']
if 'sha512' in resource['document'] else
'%s-%d-%s' % (
resource['document']['_id'], resource['document']['size'],
resource['document'].get('updated', resource['document']['created']))),
'size': resource['document']['size'],
'allowcache': True,
'lock': threading.Lock(),
}
try:
directpath = File().getAssetstoreAdapter(
resource['document']).getLocalFilePath(resource['document'])
if directpath:
info['allowcache'] = False
info['directpath'] = directpath
except FilePathException:
pass
with self.openFilesLock:
fh = self.nextFH
self.nextFH += 1
Expand Down Expand Up @@ -397,27 +507,37 @@ def unmountServer(path, lazy=False, quiet=False):
'the default database will be used.')
@click.option(
'-o', '--options', 'fuseOptions', default=None,
help='Comma separated list of additional FUSE mount options. '
'ro and use_ino cannot be overridden.')
@click.option(
'-q', '--quiet', is_flag=True, default=False,
help='Suppress Girder startup information or unmount output.')
help='Comma separated list of additional FUSE mount options. ro cannot '
'be overridden. Some additional options can be specified: Options '
'beginning with diskcache are used to create a diskcache for somewhat '
'persistent local data storage. These are passed to diskcache.Cache '
'with the "diskcache" prefix removed. diskcache by itself will enable '
'the default diskcache. diskcache_directory and diskcache_size_limit (in '
'bytes) are the most common. The directory defaults to '
'~/.cache/girder-mount. stat_cache_ttl specifies how long in seconds '
'attributes are cached for girder documents. A longer time reduces '
'network access but could result in stale permissions or miss updates.')
@click.option(
'-u', '--umount', '--unmount', 'unmount', is_flag=True, default=False,
help='Unmount a mounted FUSE filesystem.')
@click.option(
'-l', '-z', '--lazy', 'lazy', is_flag=True, default=False,
help='Lazy unmount.')
@click.option('--plugins', default=None, help='Comma separated list of plugins to import.')
def main(path, database, fuseOptions, quiet, unmount, lazy, plugins):
@click.option(
'-q', '--quiet', is_flag=True, default=False,
help='Suppress Girder startup information or unmount output.')
@click.option('-v', '--verbose', count=True)
def main(path, database, fuseOptions, unmount, lazy, plugins, quiet, verbose):
if unmount or lazy:
result = unmountServer(path, lazy, quiet)
sys.exit(result)
mountServer(path=path, database=database, fuseOptions=fuseOptions,
quiet=quiet, plugins=plugins)
quiet=quiet, verbose=verbose, plugins=plugins)


def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None):
def mountServer(path, database=None, fuseOptions=None, quiet=False, verbose=0,
plugins=None):
"""
Perform the mount.
Expand All @@ -429,22 +549,25 @@ def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None
case insensitive. For instance, 'foreground' or 'foreground=True' will
keep this program running until the SIGTERM or unmounted.
:param quiet: if True, suppress Girder logs.
:param verbose: if not zero, log to stderr instead of the girder logs.
:param plugins: an optional list of plugins to enable. If None, use the
plugins that are configured.
"""
if quiet:
if quiet or verbose:
curConfig = config.getConfig()
curConfig.setdefault('logging', {})['log_quiet'] = True
curConfig.setdefault('logging', {})['log_level'] = 'FATAL'
girder._attachFileLogHandlers()
if verbose:
logger.setLevel(max(1, logging.ERROR - verbose * 10))
logger.addHandler(logging.StreamHandler())
if database and '://' in database:
cherrypy.config['database']['uri'] = database
if plugins is not None:
plugins = plugins.split(',')
webroot, appconf = configureServer(plugins=plugins)
girder._setupCache()

opClass = ServerFuse(stat=os.stat(path))
options = {
# By default, we run in the background so the mount command returns
# immediately. If we run in the foreground, a SIGTERM will shut it
Expand All @@ -454,7 +577,7 @@ def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None
# This lets the OS buffer files efficiently.
'auto_cache': True,
# We aren't specifying our own inos
'use_ino': False,
# 'use_ino': False,
# read-only file system
'ro': True,
}
Expand All @@ -469,10 +592,11 @@ def mountServer(path, database=None, fuseOptions=None, quiet=False, plugins=None
True if value.lower() == 'true' else value)
else:
key, value = opt, True
if key in ('use_ino', 'ro', 'rw') and options.get(key) != value:
if key in ('ro', 'rw') and options.get(key) != value:
logprint.warning('Ignoring the %s=%r option' % (key, value))
continue
options[key] = value
opClass = ServerFuse(stat=os.stat(path), options=options)
Setting().set(SettingKey.GIRDER_MOUNT_INFORMATION,
{'path': path, 'mounttime': time.time()})
FUSELogError(opClass, path, **options)
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def prerelease_local_scheme(version):
'paramiko'
],
'mount': [
'cachetools',
'diskcache',
'fusepy>=3.0'
]
}
Expand Down
Loading

0 comments on commit 39b17ab

Please sign in to comment.