diff --git a/strax/corrections.py b/strax/corrections.py index a6e33b1d2..8c9497fcb 100644 --- a/strax/corrections.py +++ b/strax/corrections.py @@ -67,8 +67,10 @@ def read(self, correction): del df['_id'] df['time'] = pd.to_datetime(df['time'], utc=True) - - return df.set_index('time') + df = df.set_index('time') + df = df.sort_index() + + return df def interpolate(self, what, when, how='interpolate', **kwargs): """ diff --git a/strax/plugin.py b/strax/plugin.py index 03eb4c414..84e965e0d 100644 --- a/strax/plugin.py +++ b/strax/plugin.py @@ -180,8 +180,18 @@ def __repr__(self): return self.__class__.__name__ def dtype_for(self, data_type): + """ + Provide the dtype of one of the provide arguments of the plugin. + NB: does not simply provide the dtype of any datatype but must + be one of the provide arguments known to the plugin. + """ if self.multi_output: - return self.dtype[data_type] + if data_type in self.dtype: + return self.dtype[data_type] + else: + raise ValueError(f'dtype_for provides the dtype of one of the ' + f'provide datatypes specified in this plugin ' + f'{data_type} is not provided by this plugin') return self.dtype def can_rechunk(self, data_type): diff --git a/strax/run_selection.py b/strax/run_selection.py index ef9ab3f3b..f543e08bc 100644 --- a/strax/run_selection.py +++ b/strax/run_selection.py @@ -209,9 +209,13 @@ def select_runs(self, run_mode=None, run_id=None, for d in have_available: if not d + '_available' in dsets.columns: # Get extra availability info from the run db - self.runs[d + '_available'] = np.in1d( - self.runs.name.values, - self.list_available(d)) + d_available = np.in1d(self.runs.name.values, + self.list_available(d)) + # Save both in the context and for this selection using + # available = ('data_type',) + self.runs[d + '_available'] = d_available + dsets[d + '_available'] = d_available + for d in have_available: dsets = dsets[dsets[d + '_available']] return dsets diff --git a/strax/storage/common.py b/strax/storage/common.py index 867ccfbbe..82ccaddbe 100644 --- a/strax/storage/common.py +++ b/strax/storage/common.py @@ -126,6 +126,19 @@ def __init__(self, self.readonly = readonly self.log = logging.getLogger(self.__class__.__name__) + def __str__(self): + return self.__repr__() + + def __repr__(self): + # List the relevant attributes ('path' is actually for the + # strax.DataDirectory but it makes more sense to put it here). + attributes = ('readonly', 'path', 'exclude', 'take_only') + representation = f'{self.__class__.__module__}.{self.__class__.__name__}' + for attr in attributes: + if hasattr(self, attr) and getattr(self, attr): + representation += f', {attr}: {getattr(self, attr)}' + return representation + def loader(self, key: DataKey, time_range=None, allow_incomplete=False, diff --git a/strax/storage/mongo.py b/strax/storage/mongo.py index c1c0bda44..5d563c336 100644 --- a/strax/storage/mongo.py +++ b/strax/storage/mongo.py @@ -14,6 +14,7 @@ from strax import StorageFrontend, StorageBackend, Saver from datetime import datetime from pytz import utc as py_utc +from warnings import warn export, __all__ = strax.exporter() @@ -194,6 +195,11 @@ def _save_chunk(self, data, chunk_info, executor=None): """see strax.Saver""" chunk_i = chunk_info['chunk_i'] + if getattr(data, 'nbytes') > 10_000_000: + warn('Inserting documents of size > 10 MB, this is getting ' + 'close to the 16 MB document size in mongo', + UserWarning) + aggregate_data = [] # Remove the numpy structures and parse the data. The dtype # information is saved with the metadata so don't worry @@ -207,9 +213,16 @@ def _save_chunk(self, data, chunk_info, executor=None): # Get the document to update, if none available start a new one # for this chunk chunk_id = self.ids_chunk.get(chunk_i, None) + + # We can fail here if the document is too large to be written + # out to mongo. One could do a try: except + # pymongo.errors.WriteError: pass, but that potentially leads to + # abuse of a Mongo instance going unnoticed. if chunk_id is not None: + # In principle this should not end up here as each chunk + # should be it's own document unless you re-chunk self.col.update_one({'_id': chunk_id}, - {'$addToSet': {f'data': aggregate_data}}) + {'$push': {f'data': aggregate_data}}) else: # Start a new document, update it with the proper information doc = self.basic_md.copy() diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 83f2028d0..3a74ef0fc 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -19,7 +19,7 @@ import boto3 from botocore.client import Config from botocore.exceptions import ClientError - +from warnings import warn import strax from strax import StorageFrontend @@ -58,6 +58,13 @@ def __init__(self, super().__init__(*args, **kwargs) + # S3 storage will be removed in the near future unless one objects in + # https://github.com/AxFoundation/strax/issues/307 + warn('DeprecationWarning S3 storage is not used in strax and will be ' + 'removed in 2021. If there are objections please refer to ' + 'https://github.com/AxFoundation/strax/issues/307', + DeprecationWarning) + # Get S3 protocol credentials if s3_access_key_id is None: if 'S3_ACCESS_KEY_ID' not in os.environ: