Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XENON RunDB Frontend #100

Merged
merged 6 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
'graphviz'],
'xenon': ['keras',
'tensorflow',
'scipy']
'scipy',
'pymongo',]
},
long_description_content_type="text/markdown",
packages=setuptools.find_packages(),
Expand Down
61 changes: 34 additions & 27 deletions strax/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import tempfile

import boto3
from botocore.client import Config
from botocore.exceptions import ClientError

import strax
from strax import StorageFrontend
Expand Down Expand Up @@ -66,19 +68,21 @@ def __init__(self,
raise EnvironmentError("S3 secret key not specified")
s3_secret_access_key = os.environ.get('S3_SECRET_ACCESS_KEY')

self.boto3_client_kwargs = {'aws_access_key_id': s3_access_key_id,
'aws_secret_access_key': s3_secret_access_key,
'endpoint_url': endpoint_url,
'service_name': 's3',
'config': Config(connect_timeout=5,
retries={'max_attempts': 10})}

# Initialized connection to S3-protocol storage
self.s3 = boto3.client(aws_access_key_id=s3_access_key_id,
aws_secret_access_key=s3_secret_access_key,
endpoint_url=endpoint_url,
service_name='s3')
self.s3 = boto3.client(**self.boto3_client_kwargs)

# Create bucket (does nothing if exists)
self.s3.create_bucket(Bucket=BUCKET_NAME)
# self.s3.create_bucket(Bucket=BUCKET_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this accidentally commented out or do we no longer need it?


# Setup backends for reading
self.backends = [S3Backend(aws_access_key_id=s3_access_key_id,
aws_secret_access_key=s3_secret_access_key,
endpoint_url=endpoint_url)]
self.backends = [S3Backend(**self.boto3_client_kwargs), ]

def _find(self, key, write, fuzzy_for, fuzzy_for_options):
"""Determine if data exists
Expand All @@ -88,26 +92,26 @@ def _find(self, key, write, fuzzy_for, fuzzy_for_options):
if fuzzy_for or fuzzy_for_options:
raise NotImplementedError("Can't do fuzzy with S3")

# Check exact match / write case
key_str = str(key)
bk = self.backend_key(key_str)

# See if any objects exist for this key
objects_list = self.s3.list_objects(Bucket=BUCKET_NAME,
Prefix=key_str)
if 'Contents' in objects_list:
if write and not self._can_overwrite(key):
raise strax.DataExistsError(at=bk)
return bk
else:
# No objects yet...
if write:
return bk
try:
self.backends[0].get_metadata(key)
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you catch this in get_metadata and raise DataNotAvailable there? This would fix #90. (Then here, you'd catch DataNotAvailable again if you're writing.)

The S3 frontend is unusual in that _find relies on get_metadata rather than the other way around. But the context relies on DataNotAvailable being raised if get_metadata fails, so it can try other storage frontends (https://github.com/AxFoundation/strax/blob/master/strax/context.py#L632).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it turns out #90 has already been fixed, so never mind.

if write:
return bk
else:
raise strax.DataNotAvailable
else:
# If reading and no objects, then problem
raise strax.DataNotAvailable
raise ex

if write and not self._can_overwrite(key):
raise strax.DataExistsError(at=bk)
return bk

def backend_key(self, key_str):

return self.backends[0].__class__.__name__, key_str

def remove(self, key):
Expand All @@ -125,8 +129,7 @@ def __init__(self, **kwargs):
super().__init__()

# Initialized connection to S3-protocol storage
self.s3 = boto3.client(**kwargs,
service_name='s3')
self.s3 = boto3.client(**kwargs)
self.kwargs = kwargs # Used later for setting up Saver

def get_metadata(self, backend_key):
Expand Down Expand Up @@ -169,12 +172,14 @@ class S3Saver(strax.Saver):
def __init__(self, key, metadata, meta_only,
**kwargs):
super().__init__(metadata, meta_only)
self.s3 = boto3.client(**kwargs,
service_name='s3')
self.s3 = boto3.client(**kwargs)

# Unique key specifying processing of a run
self.strax_unique_key = key

self.config = boto3.s3.transfer.TransferConfig(max_concurrency=40,
num_download_attempts=30)

def _save_chunk(self, data, chunk_info):
# Keyname
key_name = f"{self.strax_unique_key}/{chunk_info['chunk_i']:06d}"
Expand All @@ -187,7 +192,8 @@ def _save_chunk(self, data, chunk_info):
f.seek(0)
self.s3.upload_fileobj(f,
BUCKET_NAME,
key_name)
key_name,
Config=self.config)

return dict(key_name=key_name,
filesize=filesize)
Expand All @@ -207,6 +213,7 @@ def _close(self):
prefix = f'{self.strax_unique_key}/metadata_'
objects_list = self.s3.list_objects(Bucket=BUCKET_NAME,
Prefix=prefix)

if 'Contents' in objects_list:
for file in objects_list['Contents']:
# Grab chunk metadata as ASCIII
Expand Down
1 change: 1 addition & 0 deletions strax/xenon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from . import common, plugins, cut_plugins, pax_interface # noqa
from .rundb import *
73 changes: 73 additions & 0 deletions strax/xenon/rundb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import re
import socket

import pymongo

import strax

export, __all__ = strax.exporter()


@export
class RunDB(strax.StorageFrontend):
"""Frontend that searches RunDB MongoDB for data.

Loads appropriate backends ranging from Files to S3.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the caveats about writing you wrote in the PR here. If I remember correctly, writing is supported, but it doesn't get registered back in the rundb.

Also perhaps worth noting the chosen write backend is files if you're on dali and S3 otherwise (right?).

"""

def __init__(self,
mongo_url,
path='.',
s3_kwargs={},
*args, **kwargs):

super().__init__(*args, **kwargs)

self.client = pymongo.MongoClient(mongo_url)
self.collection = self.client['xenon1t']['runs']
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one use.


self.path = path

# Setup backends for reading. Don't change order!
self.backends = [
strax.S3Backend(**s3_kwargs),
]

# This mess tries to identify the cluster
self.dali = True if re.match('^dali.*rcc.*', socket.getfqdn()) else False
if self.dali:
self.backends.append(strax.FileSytemBackend())

def _find(self, key: strax.DataKey, write, fuzzy_for, fuzzy_for_options):
if fuzzy_for or fuzzy_for_options:
raise NotImplementedError("Can't do fuzzy with RunDB yet.")

query = {'name': key.run_id,
'data.type': key.data_type,
'$or': [{'data.host': 'ceph-s3'}]}

if self.dali:
query['$or'].append({'data.host': 'dali'})

doc = self.collection.find_one(query,
{'data': {'$elemMatch': {'type': key.data_type,
'meta.lineage': key.lineage}}
})

if (doc is None) or ('data' not in doc) or (len(doc['data']) == 0):
if write:
return self.backends[-1].__class__.__name__, os.path.join(self.path, str(key))
else:
# If reading and no objects, then problem
raise strax.DataNotAvailable

datum = doc['data'][0]

if write and not self._can_overwrite(key):
raise strax.DataExistsError(at=datum['protocol'])

return datum['protocol'], datum['location']

def remove(self, key):
raise NotImplementedError()