Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuse #53

Merged
merged 27 commits into from
Jan 15, 2018
Merged

Fuse #53

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sudo: False
sudo: True

language: python

Expand All @@ -9,6 +9,10 @@ matrix:
- python: 3.6

install:
# install fuse libs
- sudo apt-get update
- sudo apt-get install libfuse-dev

# Install conda
- wget http://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh -O miniconda.sh
- bash miniconda.sh -b -p $HOME/miniconda
Expand All @@ -17,10 +21,10 @@ install:
- conda update conda

# Install dependencies
- conda create -n test python=$TRAVIS_PYTHON_VERSION pytest requests
- conda create -n test python=$TRAVIS_PYTHON_VERSION pytest requests pandas
- source activate test
- conda install -c conda-forge google-auth google-auth-oauthlib
- pip install vcrpy
- pip install vcrpy fusepy

script:
- GCSFS_RECORD_MODE=none py.test -vv -x gcsfs
Expand Down
71 changes: 71 additions & 0 deletions docs/source/fuse.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
GCSFS and FUSE
==============

Warning, this functionality is **experimental**

FUSE_ is a mechanism to mount user-level filesystems in unix-like
systems (linux, osx, etc.). GCSFS is able to use FUSE to present remote
data/keys as if they were a directory on your local file-system. This
allows for standard shell command manipulation, and loading of data
by libraries that can only handle local file-paths (e.g., netCDF/HDF5).

.. _FUSE: https://github.com/libfuse/libfuse

Requirements
-------------

In addition to a standard installation of GCSFS, you also need:

- libfuse as a system install. The way to install this will depend
on your OS. Examples include ``sudo apt-get install fuse``,
``sudo yum install fuse`` and download from osxfuse_.

- fusepy_, which can be installed via conda or pip

- pandas, which can also be installed via conda or pip (this library is
used only for its timestring parsing.

.. _osxfuse: https://osxfuse.github.io/
.. _fusepy: https://github.com/terencehonles/fusepy

Usage
-----

Installation of GCSFS will make the command ``gcsfuse``. Use the flag ``--help``
to display the usage message.

You must provide a path on GCS to regard as the root directory for your data, and
a `mount point`, a location on your local file-system in which you want the remote
data to appear. For example, lets consider that the bucket ``mybucket`` contains a
key ``path/key``,
(full path ``mybucket/path/key``). If the remote root is set to ``mybucket/path``
and the mount point is ``~/gcs`` then after
mounting, listing the contents of ``~/gcs`` will show a file called ``key``.

.. code-block::bash

$ gcsfuse mybucket/path ~/fuse
$ ls ~/fuse
key

Starting the process in foreground mode will give some debug information on which
bytes in which keys are being read.

To stop the process, either use ^C (if in foreground mode), explicitly terminate
the process, or use the command ``umount`` with the mount point (in this example
``umount ~/gcs``).

Caveats
-------

This functionality is experimental. The command usage may change, and you should
expect exceptions.

Furthermore:

- although mutation operations tentatively work, you should not at the moment
depend on gcsfuse as a reliable system that won't loose your data.

- permissions on GCS are complicated, so all files will be shown as fully-open
0o777, regardless of state. If a read fails, you likely don't have the right
permissions.
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Contents
.. toctree::
api
developer
fuse
:maxdepth: 2


Expand Down
Empty file added gcsfs/cli/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions gcsfs/cli/gcsfuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import click
from fuse import FUSE

from gcsfs.gcsfuse import GCSFS


@click.command()
@click.argument('bucket', type=str, required=True)
@click.argument('mount_point', type=str, required=True)
@click.option('--token', type=str, required=False, default=None,
help="Token to use for authentication")
@click.option('--project-id', type=str, required=False, default='',
help="Billing Project ID")
@click.option('--foreground/--background', default=True,
help="Run in the foreground or as a background process")
def main(bucket, mount_point, token, project_id, foreground):
""" Mount a Google Cloud Storage (GCS) bucket to a local directory """
print("Mounting bucket %s to directory %s" % (bucket, mount_point))
FUSE(GCSFS(bucket, token=token, project=project_id),
mount_point, nothreads=True, foreground=foreground)


if __name__ == '__main__':
main()
74 changes: 54 additions & 20 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,19 +532,39 @@ def exists(self, path):

def info(self, path):
bucket, key = split_path(path)
if not key:
files = self.ls('', True)
f = [f for f in files if f['name'] == bucket]
if f:
return f[0]
if self.ls(bucket):
return {'bucket': bucket, 'kind': 'storage#object',
'size': 0, 'storageClass': 'DIRECTORY',
'name': bucket+'/'}
raise FileNotFoundError
if bucket not in self.dirs:
d = self._call('get', 'b/{}/o/{}', bucket, key)
d['name'] = '%s/%s' % (bucket, d['name'])
d['size'] = int(d.get('size'), 0)
return d
else:
path = '/'.join(split_path(path))
files = self.ls(path, True)
out = [f for f in files if f['name'] == path]
if out:
return out[0]
else:
raise FileNotFoundError(path)
try:
d = self._call('get', 'b/{}/o/{}', bucket, key)
d['name'] = '%s/%s' % (bucket, d['name'])
d['size'] = int(d.get('size'), 0)
return d
except FileNotFoundError:
pass
path1 = '/'.join(split_path(path))
out = []
try:
files = self.ls(path1, True)
out = [f for f in files if f['name'] == path1]
except FileNotFoundError:
pass
if not out:
# no such file, but try for such a directory
parent = path.rstrip('/').rsplit('/', 1)[0]
files = self.ls(parent, True)
out = [f for f in files if f['name'] == path.rstrip('/') + '/']
if out:
return out[0]
raise FileNotFoundError(path)

def url(self, path):
return self.info(path)['mediaLink']
Expand Down Expand Up @@ -728,6 +748,8 @@ def __init__(self, gcsfs, path, mode='rb', block_size=5 * 2 ** 20,
Custom metadata, in key/value pairs, added at file creation
"""
bucket, key = split_path(path)
if not key:
raise OSError('Attempt to open a bucket')
self.gcsfs = gcsfs
self.bucket = bucket
self.key = key
Expand Down Expand Up @@ -965,17 +987,29 @@ def _fetch(self, start, end):
self.cache = _fetch_range(self.details, self.gcsfs.session, start,
self.end)
if start < self.start:
new = _fetch_range(self.details, self.gcsfs.session, start,
self.start)
self.start = start
self.cache = new + self.cache
if self.end - end > self.blocksize:
self.start = start
self.end = end + self.blocksize
self.cache = _fetch_range(self.details, self.gcsfs.session,
self.start, self.end)
else:
new = _fetch_range(self.details, self.gcsfs.session, start,
self.start)
self.start = start
self.cache = new + self.cache
if end > self.end:
if self.end > self.size:
return
new = _fetch_range(self.details, self.gcsfs.session, self.end,
end + self.blocksize)
self.end = end + self.blocksize
self.cache = self.cache + new
if end - self.end > self.blocksize:
self.start = start
self.end = end + self.blocksize
self.cache = _fetch_range(self.details, self.gcsfs.session,
self.start, self.end)
else:
new = _fetch_range(self.details, self.gcsfs.session, self.end,
end + self.blocksize)
self.end = end + self.blocksize
self.cache = self.cache + new

def read(self, length=-1):
"""
Expand Down
137 changes: 137 additions & 0 deletions gcsfs/gcsfuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from __future__ import print_function
import os
import stat
import pandas as pd
from errno import ENOENT, EIO
from fuse import Operations, FuseOSError
from gcsfs import GCSFileSystem, core
from pwd import getpwnam
from grp import getgrnam


def str_to_time(s):
t = pd.to_datetime(s)
return t.to_datetime64().view('int64') / 1e9


class GCSFS(Operations):

def __init__(self, path='.', gcs=None, **fsargs):
if gcs is None:
self.gcs = GCSFileSystem(**fsargs)
else:
self.gcs = gcs
self.cache = {}
self.counter = 0
self.root = path

def getattr(self, path, fh=None):
try:
info = self.gcs.info(''.join([self.root, path]))
except FileNotFoundError:
raise FuseOSError(ENOENT)
data = {'st_uid': 1000, 'st_gid': 1000}
perm = 0o777

if info['storageClass'] == 'DIRECTORY' or 'bucket' in info['kind']:
data['st_atime'] = 0
data['st_ctime'] = 0
data['st_mtime'] = 0
data['st_mode'] = (stat.S_IFDIR | perm)
data['st_size'] = 0
data['st_blksize'] = 0
else:
data['st_atime'] = str_to_time(info['timeStorageClassUpdated'])
data['st_ctime'] = str_to_time(info['timeCreated'])
data['st_mtime'] = str_to_time(info['updated'])
data['st_mode'] = (stat.S_IFREG | perm)
data['st_size'] = info['size']
data['st_blksize'] = 5 * 2**20
data['st_nlink'] = 1

return data

def readdir(self, path, fh):
path = ''.join([self.root, path])
files = self.gcs.ls(path)
files = [f.rstrip('/').rsplit('/', 1)[1] for f in files]
return ['.', '..'] + files

def mkdir(self, path, mode):
bucket, key = core.split_path(path)
if not self.gcs.info(path):
self.gcs.dirs['bucket'].append({
'bucket': bucket, 'kind': 'storage#object',
'size': 0, 'storageClass': 'DIRECTORY',
'name': path.rstrip('/') + '/'})

def rmdir(self, path):
info = self.gcs.info(path)
if info['storageClass': 'DIRECTORY']:
self.gcs.rm(path, False)

def read(self, path, size, offset, fh):
print('read', path, size, offset, fh)
fn = ''.join([self.root, path])
f = self.cache[fn]
f.seek(offset)
out = f.read(size)
return out

def write(self, path, data, offset, fh):
print('write', path, offset, fh)
f = self.cache[fh]
f.write(data)
return len(data)

def create(self, path, flags):
print('create', path, oct(flags))
fn = ''.join([self.root, path])
self.gcs.touch(fn) # this makes sure directory entry exists - wasteful!
# write (but ignore creation flags)
f = self.gcs.open(fn, 'wb')
self.cache[self.counter] = f
self.counter += 1
return self.counter - 1

def open(self, path, flags):
print('open', path, oct(flags))
fn = ''.join([self.root, path])
if flags % 2 == 0:
# read
f = self.gcs.open(fn, 'rb')
else:
# write (but ignore creation flags)
f = self.gcs.open(fn, 'wb')
self.cache[self.counter] = f
self.counter += 1
return self.counter - 1

def truncate(self, path, length, fh=None):
print('truncate', path, length, fh)
fn = ''.join([self.root, path])
if length != 0:
raise NotImplementedError
# maybe should be no-op since open with write sets size to zero anyway
self.gcs.touch(fn)

def unlink(self, path):
print('delete', path)
fn = ''.join([self.root, path])
try:
self.gcs.rm(fn, False)
except (IOError, FileNotFoundError):
raise FuseOSError(EIO)

def release(self, path, fh):
print('close', path, fh)
try:
f = self.cache[fh]
f.close()
self.cache.pop(fh, None) # should release any cache memory
except Exception as e:
print(e)
return 0

def chmod(self, path, mode):
raise NotImplementedError
Loading