Skip to content

Commit

Permalink
Merge pull request #100 from AxFoundation/rundb
Browse files Browse the repository at this point in the history
XENON RunDB Frontend
  • Loading branch information
JelleAalbers committed Oct 3, 2018
2 parents 0fbe63a + f066f09 commit 2da113c
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 28 deletions.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
'graphviz'],
'xenon': ['keras',
'tensorflow',
'scipy']
'scipy',
'pymongo',]
},
long_description_content_type="text/markdown",
packages=setuptools.find_packages(),
Expand Down
61 changes: 34 additions & 27 deletions strax/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import tempfile

import boto3
from botocore.client import Config
from botocore.exceptions import ClientError

import strax
from strax import StorageFrontend
Expand Down Expand Up @@ -66,19 +68,21 @@ def __init__(self,
raise EnvironmentError("S3 secret key not specified")
s3_secret_access_key = os.environ.get('S3_SECRET_ACCESS_KEY')

self.boto3_client_kwargs = {'aws_access_key_id': s3_access_key_id,
'aws_secret_access_key': s3_secret_access_key,
'endpoint_url': endpoint_url,
'service_name': 's3',
'config': Config(connect_timeout=5,
retries={'max_attempts': 10})}

# Initialized connection to S3-protocol storage
self.s3 = boto3.client(aws_access_key_id=s3_access_key_id,
aws_secret_access_key=s3_secret_access_key,
endpoint_url=endpoint_url,
service_name='s3')
self.s3 = boto3.client(**self.boto3_client_kwargs)

# Create bucket (does nothing if exists)
self.s3.create_bucket(Bucket=BUCKET_NAME)
# self.s3.create_bucket(Bucket=BUCKET_NAME)

# Setup backends for reading
self.backends = [S3Backend(aws_access_key_id=s3_access_key_id,
aws_secret_access_key=s3_secret_access_key,
endpoint_url=endpoint_url)]
self.backends = [S3Backend(**self.boto3_client_kwargs), ]

def _find(self, key, write, fuzzy_for, fuzzy_for_options):
"""Determine if data exists
Expand All @@ -88,26 +92,26 @@ def _find(self, key, write, fuzzy_for, fuzzy_for_options):
if fuzzy_for or fuzzy_for_options:
raise NotImplementedError("Can't do fuzzy with S3")

# Check exact match / write case
key_str = str(key)
bk = self.backend_key(key_str)

# See if any objects exist for this key
objects_list = self.s3.list_objects(Bucket=BUCKET_NAME,
Prefix=key_str)
if 'Contents' in objects_list:
if write and not self._can_overwrite(key):
raise strax.DataExistsError(at=bk)
return bk
else:
# No objects yet...
if write:
return bk
try:
self.backends[0].get_metadata(key)
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
if write:
return bk
else:
raise strax.DataNotAvailable
else:
# If reading and no objects, then problem
raise strax.DataNotAvailable
raise ex

if write and not self._can_overwrite(key):
raise strax.DataExistsError(at=bk)
return bk

def backend_key(self, key_str):

return self.backends[0].__class__.__name__, key_str

def remove(self, key):
Expand All @@ -125,8 +129,7 @@ def __init__(self, **kwargs):
super().__init__()

# Initialized connection to S3-protocol storage
self.s3 = boto3.client(**kwargs,
service_name='s3')
self.s3 = boto3.client(**kwargs)
self.kwargs = kwargs # Used later for setting up Saver

def get_metadata(self, backend_key):
Expand Down Expand Up @@ -169,12 +172,14 @@ class S3Saver(strax.Saver):
def __init__(self, key, metadata, meta_only,
**kwargs):
super().__init__(metadata, meta_only)
self.s3 = boto3.client(**kwargs,
service_name='s3')
self.s3 = boto3.client(**kwargs)

# Unique key specifying processing of a run
self.strax_unique_key = key

self.config = boto3.s3.transfer.TransferConfig(max_concurrency=40,
num_download_attempts=30)

def _save_chunk(self, data, chunk_info):
# Keyname
key_name = f"{self.strax_unique_key}/{chunk_info['chunk_i']:06d}"
Expand All @@ -187,7 +192,8 @@ def _save_chunk(self, data, chunk_info):
f.seek(0)
self.s3.upload_fileobj(f,
BUCKET_NAME,
key_name)
key_name,
Config=self.config)

return dict(key_name=key_name,
filesize=filesize)
Expand All @@ -207,6 +213,7 @@ def _close(self):
prefix = f'{self.strax_unique_key}/metadata_'
objects_list = self.s3.list_objects(Bucket=BUCKET_NAME,
Prefix=prefix)

if 'Contents' in objects_list:
for file in objects_list['Contents']:
# Grab chunk metadata as ASCIII
Expand Down
1 change: 1 addition & 0 deletions strax/xenon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from . import common, plugins, cut_plugins, pax_interface # noqa
from .rundb import *
73 changes: 73 additions & 0 deletions strax/xenon/rundb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import re
import socket

import pymongo

import strax

export, __all__ = strax.exporter()


@export
class RunDB(strax.StorageFrontend):
"""Frontend that searches RunDB MongoDB for data.
Loads appropriate backends ranging from Files to S3.
"""

def __init__(self,
mongo_url,
path='.',
s3_kwargs={},
*args, **kwargs):

super().__init__(*args, **kwargs)

self.client = pymongo.MongoClient(mongo_url)
self.collection = self.client['xenon1t']['runs']

self.path = path

# Setup backends for reading. Don't change order!
self.backends = [
strax.S3Backend(**s3_kwargs),
]

# This mess tries to identify the cluster
self.dali = True if re.match('^dali.*rcc.*', socket.getfqdn()) else False
if self.dali:
self.backends.append(strax.FileSytemBackend())

def _find(self, key: strax.DataKey, write, fuzzy_for, fuzzy_for_options):
if fuzzy_for or fuzzy_for_options:
raise NotImplementedError("Can't do fuzzy with RunDB yet.")

query = {'name': key.run_id,
'data.type': key.data_type,
'$or': [{'data.host': 'ceph-s3'}]}

if self.dali:
query['$or'].append({'data.host': 'dali'})

doc = self.collection.find_one(query,
{'data': {'$elemMatch': {'type': key.data_type,
'meta.lineage': key.lineage}}
})

if (doc is None) or ('data' not in doc) or (len(doc['data']) == 0):
if write:
return self.backends[-1].__class__.__name__, os.path.join(self.path, str(key))
else:
# If reading and no objects, then problem
raise strax.DataNotAvailable

datum = doc['data'][0]

if write and not self._can_overwrite(key):
raise strax.DataExistsError(at=datum['protocol'])

return datum['protocol'], datum['location']

def remove(self, key):
raise NotImplementedError()

0 comments on commit 2da113c

Please sign in to comment.