Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup and ignore old events #48

Merged
merged 5 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
# Specific
.remote-sync.json
station_list.csv
config.ini
writer_app.py
*.h5
_build/
build/


# Python
*.pyc
*.pyo
Expand Down
6 changes: 5 additions & 1 deletion writer/store_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def store_event(datafile, cluster, station_id, event):
nanoseconds = eventheader['nanoseconds']
# make an extended timestamp, which is the number of nanoseconds since
# epoch
ext_timestamp = timestamp * int(1e9) + nanoseconds
ext_timestamp = timestamp * 1_000_000_000 + nanoseconds
row['timestamp'] = timestamp

if upload_codes['_has_ext_time']:
Expand Down Expand Up @@ -111,13 +111,17 @@ def data_is_blob(uploadcode, blob_types):
def store_event_list(data_dir, station_id, cluster, event_list):
"""Store a list of events"""

minimum_year = 2020
prev_date = None
datafile = None
for event in event_list:
try:
timestamp = event['header']['datetime']
if timestamp:
date = timestamp.date()
if date.year < minimum_year:
logger.error(f'Old event ({date}), discarding event (station: {station_id})')
continue
if date != prev_date:
if datafile:
datafile.close()
Expand Down
42 changes: 20 additions & 22 deletions writer/writer_app.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
""" datastore writer application
"""datastore writer application

This module empties the station data `incoming` queue and writes the
data into HDF5 files using PyTables.
This module empties the station data `incoming` queue and writes the
data into HDF5 files using PyTables.

"""

import configparser
import logging
import logging.handlers
import os
import pickle as pickle
import pickle
import shutil
import time

from writer.store_events import store_event_list

LEVELS = {'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL}
LEVELS = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
}

logger = logging.getLogger('writer')
formatter = logging.Formatter('%(asctime)s %(name)s[%(process)d]'
'.%(funcName)s.%(levelname)s: %(message)s')
formatter = logging.Formatter('%(asctime)s %(name)s[%(process)d].%(funcName)s.%(levelname)s: %(message)s')


def writer(configfile):
Expand All @@ -45,17 +47,14 @@ def writer(configfile):

# set up logger
file = config.get('General', 'log') + '-writer'
handler = logging.handlers.TimedRotatingFileHandler(file,
when='midnight',
backupCount=14)
handler = logging.handlers.TimedRotatingFileHandler(file, when='midnight', backupCount=14)
handler.setFormatter(formatter)
logger.addHandler(handler)
level = LEVELS.get(config.get('General', 'loglevel'), logging.NOTSET)
logger.setLevel(level=level)

queue = os.path.join(config.get('General', 'data_dir'), 'incoming')
partial_queue = os.path.join(config.get('General', 'data_dir'),
'partial')
partial_queue = os.path.join(config.get('General', 'data_dir'), 'partial')

# writer process
try:
Expand Down Expand Up @@ -85,19 +84,18 @@ def process_data(file):
logger.debug('Data seems to be pickled using python 2. Decoding.')
data = decode_object(pickle.load(handle, encoding='bytes'))

logger.debug('Processing data for station %d' % data['station_id'])
store_event_list(config.get('General', 'data_dir'),
data['station_id'], data['cluster'], data['event_list'])
logger.debug(f"Processing data for station {data['station_id']}")
store_event_list(config.get('General', 'data_dir'), data['station_id'], data['cluster'], data['event_list'])


def decode_object(o):
"""recursively decode all bytestrings in object"""

if type(o) is bytes:
if isinstance(o, bytes):
return o.decode()
elif type(o) is dict:
elif isinstance(o, dict):
return {decode_object(k): decode_object(v) for k, v in o.items()}
elif type(o) is list:
elif isinstance(o, list):
return [decode_object(obj) for obj in o]
else:
return o
Loading