Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scada interface #318

Merged
merged 12 commits into from
Oct 23, 2020
64 changes: 46 additions & 18 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,15 +764,31 @@ def concat_loader(*args, **kwargs):
def estimate_run_start(self, run_id, targets=None):
"""Return run start time in ns since epoch.

This fetches from run metadata, and if this fails, it
estimates it using data metadata from targets.
"""
warnings.warn('This function is outdated and will be removed in '
'one of the feature strax releases. Please use '
'"estimate_run_start_and_end" instead.', UserWarning)
start, _ = self.estimate_run_start_and_end(run_id, targets=targets)
return start

def estimate_run_start_and_end(self, run_id, targets=None):
"""Return run start and end time in ns since epoch.

This fetches from run metadata, and if this fails, it
estimates it using data metadata from targets.
"""
try:
# Use run metadata, if it is available, to get
# the run start time (floored to seconds)
t0 = self.run_metadata(run_id, 'start')['start']
t0 = t0.replace(tzinfo=datetime.timezone.utc)
return int(t0.timestamp()) * int(1e9)
res = []
for i in ('start', 'end'):
# Use run metadata, if it is available, to get
# the run start time (floored to seconds)
t = self.run_metadata(run_id, i)[i]
t = t.replace(tzinfo=datetime.timezone.utc)
t = int(t.timestamp()) * int(1e9)
res.append(t)
return res
except (strax.RunMetadataNotAvailable, KeyError):
pass
# Get an approx start from the data itself,
Expand All @@ -781,36 +797,45 @@ def estimate_run_start(self, run_id, targets=None):
for t in strax.to_str_tuple(targets):
try:
t0 = self.get_meta(run_id, t)['chunks'][0]['start']
return (int(t0) // int(1e9)) * int(1e9)
t0 = (int(t0) // int(1e9)) * int(1e9)

t1 = self.get_meta(run_id, t)['chunks'][-1]['end']
t1 = (int(t1) // int(1e9)) * int(1e9)
return t0, t1
except strax.DataNotAvailable:
pass
warnings.warn(
"Could not estimate run start time from "
"run metadata: assuming it is 0",
"Could not estimate run start and end time from "
"run metadata: assuming it is 0 and inf",
UserWarning)
return 0

return 0, float('inf')

def to_absolute_time_range(self, run_id, targets, time_range=None,
seconds_range=None, time_within=None):
def to_absolute_time_range(self, run_id, targets=None, time_range=None,
seconds_range=None, time_within=None,
full_range=None):
"""Return (start, stop) time in ns since unix epoch corresponding
to time range.

:param run_id: run id to get
:param time_range: (start, stop) time in ns since unix epoch.
Will be returned without modification
:param targets: data types. Used only if run metadata is unavailable,
so run start time has to be estimated from data.
:param seconds_range: (start, stop) seconds since start of run
:param time_within: row of strax data (e.g. eent)
:param full_range: If True returns full time_range of the run.
"""
if ((time_range is None)
+ (seconds_range is None)
+ (time_within is None)
< 2):

selection = ((time_range is None) +
(seconds_range is None) +
(time_within is None) +
(full_range is None))
if selection < 2:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WenzDaniel , could you make full_range a bool and update the Runtime error?
As in :
full_range=False # as kwarg
full_range==False # in selection comparison

and
" time_range, seconds_range, time_within or full_range" # in the Runtime error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a bool is cleaner for an option but it's not too relevant

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I corrected the raise message. About the other thing, first I thought so too. But I think Jelle did this intentionally to allow arrays as input. Otherwise things like

if np.array([start, stop]) :

would give an ambiguity error and one would have to change it in something like:

if np.any(selection) :

raise RuntimeError("Pass no more than one one of"
" time_range, seconds_range, ot time_within")
" time_range, seconds_range, time_within"
", or full_range")
if seconds_range is not None:
t0 = self.estimate_run_start(run_id, targets)
t0, _ = self.estimate_run_start_and_end(run_id, targets)
time_range = (t0 + int(1e9 * seconds_range[0]),
t0 + int(1e9 * seconds_range[1]))
if time_within is not None:
Expand All @@ -819,6 +844,9 @@ def to_absolute_time_range(self, run_id, targets, time_range=None,
# Force time range to be integers, since float math on large numbers
# in not precise
time_range = tuple([int(x) for x in time_range])

if full_range:
time_range = self.estimate_run_start_and_end(run_id, targets)
return time_range

def get_iter(self, run_id: str,
Expand Down
50 changes: 26 additions & 24 deletions strax/storage/mongo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""I/O format for MongoDB

This plugin is designed with data monitoring in mind, to put smaller
amounds of extracted data into a database for quick access. However
amounts of extracted data into a database for quick access. However
it should work with any plugin.

Note that there is no check to make sure the 16MB document size
Expand Down Expand Up @@ -37,14 +37,16 @@ def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
"""See strax.Backend"""
chunk_i = chunk_info["chunk_i"]

# Build the chunk-registry if not done already.
if self.chunks_registry is None:
# Build the chunk-registry if not done already (NB: when asking
# for chunk_i==0, reset, otherwise we cannot load data a second
# time e.g. with allow_incomplete = True).
if self.chunks_registry is None or chunk_i == 0:
self._build_chunk_registry(backend_key)

# Unpack info about this chunk from the query. Return empty if
# not available. Use a *string* in the registry to lookup the
# chunk-data (like we do in _build_chunk_registry).
doc = self.chunks_registry.get(str(chunk_i), None)
doc = self.chunks_registry.get(backend_key + str(chunk_i), None)

if doc is None:
# Did not find the data. NB: can be that the query is off in
Expand All @@ -55,7 +57,8 @@ def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
else:
chunk_doc = doc.get('data', None)
if chunk_doc is None:
raise ValueError(f'Doc for chunk_{chunk_i} in wrong format:\n{doc}')
raise ValueError(
f'Doc for chunk_{chunk_i} in wrong format:\n{doc}')

# Convert JSON to numpy
chunk_len = len(chunk_doc)
Expand Down Expand Up @@ -91,13 +94,12 @@ def _build_chunk_registry(self, backend_key):

query = backend_key_to_query(backend_key)
chunks_registry = self.db[self.col_name].find(
{**query, **{"chunk_i": {'$exists':True}}},
{**query, 'chunk_i': {'$exists': True}},
{"chunk_i": 1, "data": 1})

# We are going to convert this to a dictionary as that is
# easier to lookup
self.chunks_registry = {}

for doc in chunks_registry:
chunk_key = doc.get('chunk_i', None)
if chunk_key is None:
Expand All @@ -108,7 +110,7 @@ def _build_chunk_registry(self, backend_key):
# Update our registry with this chunks info. Use chunk_i as
# chunk_key. Make it a *string* to avoid potential key-error
# issues or json-encoding headaches.
self.chunks_registry[str(chunk_key)] = doc.copy()
self.chunks_registry[backend_key + str(chunk_key)] = doc.copy()


@export
Expand Down Expand Up @@ -158,21 +160,21 @@ def __init__(self, key, metadata, col):
"""
super().__init__(metadata)
self.col = col
# Parse basic properties for online document by forcing keys in specific
# representations (rep)
# Parse basic properties for online document by forcing keys in
# specific representations (rep)
basic_meta = {}
for k, rep in (
('run_id', int), ('data_type', str), ('lineage_hash', str)):
basic_meta[k.replace('run_id', 'number')] = rep(self.md[k])
# Add datetime objects as candidates for TTL collections. Either can
# be used according to the preference of the user to index.
# Add datetime objects as candidates for TTL collections. Either
# can be used according to the preference of the user to index.
# Two entries can be used:
# 1. The time of writing.
# 2. The time of data taking.
basic_meta['write_time'] = datetime.now(py_utc)
# The run_start_time below is a placeholder and will be updated in the
# _save_chunk_metadata for the first chunk. Nevertheless we need an
# object in case there e.g. is no chunk.
# The run_start_time below is a placeholder and will be updated
# in the _save_chunk_metadata for the first chunk. Nevertheless
# we need an object in case there e.g. is no chunk.
basic_meta['run_start_time'] = datetime.now(py_utc)
# If available later update with this value:
self.run_start = None
Expand All @@ -193,17 +195,17 @@ def _save_chunk(self, data, chunk_info, executor=None):
chunk_i = chunk_info['chunk_i']

aggregate_data = []
# Remove the numpy structures and parse the data. The dtype information
# is saved with the metadata so don't worry
# Remove the numpy structures and parse the data. The dtype
# information is saved with the metadata so don't worry
for row in data:
ins = {}
for key in list(data.dtype.names):
ins[key] = row[key]
ins = remove_np(ins)
aggregate_data.append(ins)

# Get the document to update, if none available start a new one for
# this chunk
# Get the document to update, if none available start a new one
# for this chunk
chunk_id = self.ids_chunk.get(chunk_i, None)
if chunk_id is not None:
self.col.update_one({'_id': chunk_id},
Expand All @@ -222,19 +224,19 @@ def _save_chunk(self, data, chunk_info, executor=None):

def _save_chunk_metadata(self, chunk_info):
"""see strax.Saver"""
# For the first chunk we get the run_start_time and update the run-metadata file
# For the first chunk we get the run_start_time and update the
# run-metadata file
if int(chunk_info['chunk_i']) == 0:
self.run_start = datetime.fromtimestamp(
chunk_info['start']/1e9).replace(tzinfo=py_utc)

self.col.update_one({'_id': self.id_md},
{'$addToSet':
{'metadata.chunks': chunk_info}})
{'$addToSet': {'metadata.chunks': chunk_info}})

def _close(self):
"""see strax.Saver"""
# First update the run-starts of all of the chunk-documents as this is
# a TTL index -candidate
# First update the run-starts of all of the chunk-documents as
# this is a TTL index-candidate
if self.run_start is not None:
update = {'run_start_time': self.run_start}
query = {k: v for k, v in self.basic_md.items()
Expand Down