Skip to content

Commit

Permalink
Merge branch 'master' into fix_timeselections
Browse files Browse the repository at this point in the history
  • Loading branch information
JoranAngevaare committed Nov 2, 2020
2 parents 3c34338 + 887ee18 commit 96be0a6
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 8 deletions.
6 changes: 4 additions & 2 deletions strax/corrections.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ def read(self, correction):
del df['_id']

df['time'] = pd.to_datetime(df['time'], utc=True)

return df.set_index('time')
df = df.set_index('time')
df = df.sort_index()

return df

def interpolate(self, what, when, how='interpolate', **kwargs):
"""
Expand Down
12 changes: 11 additions & 1 deletion strax/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,18 @@ def __repr__(self):
return self.__class__.__name__

def dtype_for(self, data_type):
"""
Provide the dtype of one of the provide arguments of the plugin.
NB: does not simply provide the dtype of any datatype but must
be one of the provide arguments known to the plugin.
"""
if self.multi_output:
return self.dtype[data_type]
if data_type in self.dtype:
return self.dtype[data_type]
else:
raise ValueError(f'dtype_for provides the dtype of one of the '
f'provide datatypes specified in this plugin '
f'{data_type} is not provided by this plugin')
return self.dtype

def can_rechunk(self, data_type):
Expand Down
10 changes: 7 additions & 3 deletions strax/run_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,13 @@ def select_runs(self, run_mode=None, run_id=None,
for d in have_available:
if not d + '_available' in dsets.columns:
# Get extra availability info from the run db
self.runs[d + '_available'] = np.in1d(
self.runs.name.values,
self.list_available(d))
d_available = np.in1d(self.runs.name.values,
self.list_available(d))
# Save both in the context and for this selection using
# available = ('data_type',)
self.runs[d + '_available'] = d_available
dsets[d + '_available'] = d_available
for d in have_available:
dsets = dsets[dsets[d + '_available']]

return dsets
Expand Down
13 changes: 13 additions & 0 deletions strax/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ def __init__(self,
self.readonly = readonly
self.log = logging.getLogger(self.__class__.__name__)

def __str__(self):
return self.__repr__()

def __repr__(self):
# List the relevant attributes ('path' is actually for the
# strax.DataDirectory but it makes more sense to put it here).
attributes = ('readonly', 'path', 'exclude', 'take_only')
representation = f'{self.__class__.__module__}.{self.__class__.__name__}'
for attr in attributes:
if hasattr(self, attr) and getattr(self, attr):
representation += f', {attr}: {getattr(self, attr)}'
return representation

def loader(self, key: DataKey,
time_range=None,
allow_incomplete=False,
Expand Down
15 changes: 14 additions & 1 deletion strax/storage/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from strax import StorageFrontend, StorageBackend, Saver
from datetime import datetime
from pytz import utc as py_utc
from warnings import warn
export, __all__ = strax.exporter()


Expand Down Expand Up @@ -194,6 +195,11 @@ def _save_chunk(self, data, chunk_info, executor=None):
"""see strax.Saver"""
chunk_i = chunk_info['chunk_i']

if getattr(data, 'nbytes') > 10_000_000:
warn('Inserting documents of size > 10 MB, this is getting '
'close to the 16 MB document size in mongo',
UserWarning)

aggregate_data = []
# Remove the numpy structures and parse the data. The dtype
# information is saved with the metadata so don't worry
Expand All @@ -207,9 +213,16 @@ def _save_chunk(self, data, chunk_info, executor=None):
# Get the document to update, if none available start a new one
# for this chunk
chunk_id = self.ids_chunk.get(chunk_i, None)

# We can fail here if the document is too large to be written
# out to mongo. One could do a try: except
# pymongo.errors.WriteError: pass, but that potentially leads to
# abuse of a Mongo instance going unnoticed.
if chunk_id is not None:
# In principle this should not end up here as each chunk
# should be it's own document unless you re-chunk
self.col.update_one({'_id': chunk_id},
{'$addToSet': {f'data': aggregate_data}})
{'$push': {f'data': aggregate_data}})
else:
# Start a new document, update it with the proper information
doc = self.basic_md.copy()
Expand Down
9 changes: 8 additions & 1 deletion strax/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError

from warnings import warn
import strax
from strax import StorageFrontend

Expand Down Expand Up @@ -58,6 +58,13 @@ def __init__(self,

super().__init__(*args, **kwargs)

# S3 storage will be removed in the near future unless one objects in
# https://github.com/AxFoundation/strax/issues/307
warn('DeprecationWarning S3 storage is not used in strax and will be '
'removed in 2021. If there are objections please refer to '
'https://github.com/AxFoundation/strax/issues/307',
DeprecationWarning)

# Get S3 protocol credentials
if s3_access_key_id is None:
if 'S3_ACCESS_KEY_ID' not in os.environ:
Expand Down

0 comments on commit 96be0a6

Please sign in to comment.